2020import nu .sitia .loggenerator .filter .GapDetectionFilter ;
2121import nu .sitia .loggenerator .filter .ProcessFilter ;
2222import nu .sitia .loggenerator .inputitems .InputItem ;
23+ import nu .sitia .loggenerator .inputitems .TemplateFileInputItem ;
2324import nu .sitia .loggenerator .outputitems .OutputItem ;
2425import nu .sitia .loggenerator .templates .Template ;
2526import nu .sitia .loggenerator .templates .TemplateFactory ;
2627import nu .sitia .loggenerator .templates .TimeTemplate ;
2728import nu .sitia .loggenerator .util .LogStatistics ;
2829import sun .misc .Signal ;
2930
30- import java .util .Date ;
31- import java .util .LinkedList ;
32- import java .util .List ;
31+ import java .util .*;
3332import java .util .logging .Logger ;
33+ import java .util .stream .Collectors ;
3434
3535/**
3636 * This is the moderator that takes input, modifies the input and writes to the output.
3939public class ItemProxy {
4040 static final Logger logger = Logger .getLogger (ItemProxy .class .getName ());
4141
42- /** The input item */
43- private final InputItem input ;
44- /** The output item */
45- private final OutputItem output ;
46-
4742 /** Filters to apply to each element processed */
48- private final List <ProcessFilter > filterList ;
43+ private final List <ProcessItem > itemList ;
4944
5045 /** Preferred eps */
5146 private final double eps ;
@@ -68,80 +63,73 @@ public class ItemProxy {
6863 /** If we have a gapDetector and the flag -cgd is true, then show
6964 * gaps every time the statistics has been printed.
7065 */
71- private ProcessFilter gapDetector = null ;
66+ private List <ProcessItem > gapDetectors ;
67+
68+ /** The configuration object */
69+ private Configuration config ;
70+
71+ private void emitMessage (String message ) {
72+ List <String > result = Arrays .asList (message );
73+ for (ProcessItem item : itemList ) {
74+ if (ProcessFilter .class .isInstance (item )) {
75+ ProcessFilter filter = (ProcessFilter ) item ;
76+ result = filter .filter (result );
77+ } else if (OutputItem .class .isInstance (item )) {
78+ OutputItem output = (OutputItem ) item ;
79+ output .write (result );
80+ }
81+ }
82+ }
7283
7384 /**
7485 * Default constructor
75- * @param input Input to use
76- * @param output Output to use
86+ * @param itemList A list of inputs, filters and outputs
7787 * @param config The configuration
7888 */
79- public ItemProxy (InputItem input , OutputItem output , List <ProcessFilter > filterList , Configuration config ) {
80- this .input = input ;
81- this .output = output ;
82- this .filterList = filterList ;
83- String isStatisticsString = config .getValue ("-s" );
84- if (isStatisticsString != null && isStatisticsString .equalsIgnoreCase ("true" )) {
89+ public ItemProxy (List <ProcessItem > itemList , Configuration config ) {
90+ this .config = config ;
91+ this .itemList = itemList ;
92+ if (config .isStatistics ()) {
8593 statistics = new LogStatistics (config );
8694 } else {
8795 statistics = null ;
8896 }
8997
90- String epsString = config .getValue ("-e" );
91- if (null != epsString ) {
92- this .eps = Double .parseDouble (epsString );
93- } else {
94- this .eps = 0 ;
95- }
98+ this .eps = config .getEps ();
9699
97- String limitString = config .getValue ("-l" );
98- if (null != limitString ) {
99- this .limit = Long .parseLong (limitString );
100- } else {
101- this .limit = 0 ;
102- }
103-
104- String templateString = config .getValue ("-t" );
100+ this .limit = config .getLimit ();
105101
106- if (templateString != null ) {
107- Template template = TemplateFactory .getTemplate (templateString );
102+ List <ProcessItem > templates =
103+ itemList .stream ().filter (item -> TemplateFileInputItem .class .isInstance (item )).collect (Collectors .toList ());
104+ long tempTime = -1 ;
105+ for (ProcessItem item : templates ) {
106+ TemplateFileInputItem templateFileInputItem = (TemplateFileInputItem ) item ;
107+ Template template = templateFileInputItem .getTemplate ();
108108 if (TimeTemplate .class .isInstance (template )) {
109- endTime = ((TimeTemplate )template ).getTime ();
109+ TimeTemplate tt = (TimeTemplate ) template ;
110+ tempTime = tempTime > tt .getTime () || tempTime == -1 ? tt .getTime () : tempTime ;
110111 }
111112 }
112-
113- // order might be important
114- if (ShutdownHandler .class .isInstance (input )) {
115- shutdownHandlers .add ((ShutdownHandler ) input );
116- }
117- shutdownHandlers .addAll (getShutdownHandlers (filterList ));
118- if (ShutdownHandler .class .isInstance (output )) {
119- shutdownHandlers .add ((ShutdownHandler ) output );
113+ if (tempTime != -1 ) {
114+ endTime = tempTime ;
120115 }
121116
122- String isContinuousGapDetectionString = config .getValue ("-cgd" );
123- if (isContinuousGapDetectionString != null && isContinuousGapDetectionString .equalsIgnoreCase ("true" )) {
124- gapDetector = getGapDetector (filterList );
125- if (null == gapDetector ) {
126- throw new RuntimeException ("The flag -cgd cannot be used without a GapDetector (-gd)" );
127- }
128- }
117+ itemList .forEach (item -> shutdownHandlers .add ((ShutdownHandler ) item ));
129118
119+ gapDetectors = getGapDetectors (itemList );
130120
131121 this .sentEvents = 0 ;
132122
133123 // Ctrl-C
134124 Signal .handle (new Signal ("INT" ), // SIGINT
135125 signal -> {
136126 System .out .println ("Sigint" );
137- if (statistics != null ) {
127+ if (this . statistics != null ) {
138128 statistics .calculateStatistics (Configuration .END_TRANSACTION );
129+ // Send end message (maybe filtered)
130+ emitMessage (Configuration .END_TRANSACTION_TEXT );
139131 }
140- if (output .printTransactionMessages ()) {
141- output .write (filterOutput (Configuration .END_TRANSACTION ));
142- }
143- input .teardown ();
144- output .teardown ();
132+ itemList .forEach (item -> item .teardown ());
145133 shutdownHandlers .forEach (ShutdownHandler ::shutdown );
146134 System .exit (-1 );
147135 });
@@ -154,10 +142,14 @@ public ItemProxy(InputItem input, OutputItem output, List<ProcessFilter> filterL
154142 * @param filterList a List<filter> to search
155143 * @return GapDetectionFilter or null
156144 */
157- private ProcessFilter getGapDetector (List <ProcessFilter >filterList ) {
158- return filterList .stream ().reduce (null , (_sofar , element ) ->
159- element instanceof GapDetectionFilter ? element : null
160- );
145+ private List <ProcessItem > getGapDetectors (List <ProcessItem >filterList ) {
146+ List <ProcessItem > result = new ArrayList <>();
147+ filterList .stream ().forEach (f -> {
148+ if (GapDetectionFilter .class .isInstance (f )) {
149+ result .add (f );
150+ }
151+ });
152+ return result ;
161153 }
162154
163155 /**
@@ -169,64 +161,66 @@ private ProcessFilter getGapDetector(List<ProcessFilter>filterList) {
169161 */
170162 public void pump () {
171163 logger .fine ("ItemProxy starting up..." );
172- input .setup ();
173- output .setup ();
164+ itemList .forEach (item -> item .setup ());
174165
175166 if (statistics != null ) {
176167 statistics .setTransactionStart (new Date ().getTime ());
168+ logger .finest ("Transaction start: " + statistics .getTransactionStart ());
177169 }
178- if (output .printTransactionMessages ()) {
179- output .write (filterOutput (Configuration .BEGIN_TRANSACTION ));
180- }
170+ List <String > messages = new ArrayList <>();
181171
172+ // When we don't have any more input items that report hasNext true,
173+ // then we are done.
174+ boolean hasNext = true ;
175+ // If we have a statistics object, then we want to send a start message
176+ boolean firstTime = true ;
182177 logger .finer ("ItemProxy pumping messages..." );
183178 // Grab inputs as long as we have input and the limit is not reached and the time limit has not been reached
184- while (input .hasNext () && (limit == 0 || sentEvents < limit ) && (endTime == 0 || new Date ().getTime () < endTime )) {
185- // Assume we have no filters
186- List <String > filtered = input .next ();
187- List <String > toSend = filterOutput (filtered );
188- // in case of a batch of logs that will become more than the limit of logs
189- while (limit != 0 && sentEvents + toSend .size () > limit ) {
190- toSend .remove (toSend .size ()-1 ); // remove last entry
179+ while (hasNext && (limit == 0 || sentEvents < limit ) && (endTime == 0 || new Date ().getTime () < endTime )) {
180+ hasNext = false ;
181+ for (ProcessItem item : itemList ) {
182+ if (InputItem .class .isInstance (item )) {
183+ InputItem input = (InputItem ) item ;
184+ if (input .hasNext ()) {
185+ hasNext = true ;
186+ List <String > next = input .next ();
187+ messages .addAll (next );
188+ }
189+ } else if (OutputItem .class .isInstance (item )) {
190+ OutputItem output = (OutputItem ) item ;
191+ output .write (messages );
192+ } else if (ProcessFilter .class .isInstance (item )) {
193+ ProcessFilter filter = (ProcessFilter ) item ;
194+ messages = filter .filter (messages );
195+ }
196+ if (firstTime && statistics != null && messages .size () > 0 ) {
197+ // Send start message (maybe filtered)
198+ emitMessage (Configuration .BEGIN_TRANSACTION_TEXT );
199+ firstTime = false ;
200+ }
191201 }
192- output .write (toSend );
193- sentEvents += toSend .size ();
194-
202+ sentEvents += messages .size ();
195203 if (statistics != null ) {
196- boolean hasPrinted = statistics .calculateStatistics (filtered );
197- if (hasPrinted && null != gapDetector ) {
204+ boolean hasPrinted = statistics .calculateStatistics (messages );
205+ if (hasPrinted && gapDetectors . size () > 0 ) {
198206 // Also, print the gapDetection periodically
199- System .out .println (((GapDetectionFilter )gapDetector ).getDetector ().toString ());
207+ gapDetectors .forEach (gapDetector -> {
208+ System .out .println (((GapDetectionFilter )gapDetector ).getDetector ().toString ());
209+ });
200210 }
201211 }
202212 // Should we throttle the output to lower the eps?
203213 throttle (statistics );
214+ messages .clear ();
204215 }
205216 if (statistics != null ) {
217+ emitMessage (Configuration .END_TRANSACTION_TEXT );
206218 statistics .calculateStatistics (Configuration .END_TRANSACTION );
207219 }
208- if (output .printTransactionMessages ()) {
209- output .write (filterOutput (Configuration .END_TRANSACTION ));
210- }
211- input .teardown ();
212- output .teardown ();
220+ itemList .forEach (item -> item .teardown ());
213221 shutdownHandlers .forEach (ShutdownHandler ::shutdown );
214222 }
215223
216- /**
217- * ItemProxy want's to write transaction messages to the stream, but
218- * the user might have added a filter for that. Run all filters
219- * on that data
220- * @param toFilter the data to filter
221- * @return the filtered data (empty list or the argument)
222- */
223- private List <String > filterOutput (List <String > toFilter ) {
224- for (ProcessFilter filter : filterList ) {
225- // We had at least one filter. Process:
226- toFilter = filter .filter (toFilter );
227- }
228- return toFilter ;
229- }
230224
231225 /**
232226 * When eps is limited, throttle by using Thread.sleep()
0 commit comments