diff --git a/IANA_IPFIX_TYPES.java b/IANA_IPFIX_TYPES.java new file mode 100644 index 0000000..e3d5cc4 --- /dev/null +++ b/IANA_IPFIX_TYPES.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.netflow; + +import java.net.Inet4Address; +import java.net.Inet6Address; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; + +public enum IANA_IPFIX_TYPES { + BOOLEAN, + // numbers, not floating points are handled as a special case + unsigned64(8, true), + unsigned32(4, true), + unsigned16(2, true), + unsigned8(1, true), + signed64(8, false), + signed32(4, false), + signed16(2, false), + signed8(1, false), + float64, + float32, + // strings and special types that get special treatment + string, + octetArray, + ipv4Address, + ipv6Address, + macAddress, + // date and time related, which are treated as a number + dateTimeSeconds(4, true, true), + dateTimeMilliseconds(8, true, true), + dateTimeMicroseconds(8, true, true), + dateTimeNanoseconds(8, true, true), + // only here as a holder for compatibility + Netflow9, + // extended structures + basicList, + subTemplateList, + subTemplateMultiList; + + private final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + + public static long bytesToLong(byte[] b, int c) { + long result = 0; + if (c!=b.length) {c=b.length;} + if (b.length!=0) {for (int i = 0; i < c; i++) { + result <<= 8; + result |= (b[i] & 0xFF); + }} + return result; + } + + /** + * Build a string representation of the byte array + * + * @throws UnknownHostException if a bad IP address is passed + */ + public String convertToString(byte[] dst, int len) { + if (this == ipv4Address) { + try { + return Inet4Address.getByAddress(dst).getHostAddress(); + } catch (UnknownHostException e) { + return ""; + } + } else if (this == ipv6Address) { + try { + return Inet6Address.getByAddress(dst).getHostAddress(); + } catch (UnknownHostException e) { + return ""; + } + } else if (this == IANA_IPFIX_TYPES.BOOLEAN) { + if (dst[0] > 0) + return "true"; + else + return "false"; + } else if (this == IANA_IPFIX_TYPES.macAddress) { + StringBuilder sb = new StringBuilder(18); + for (byte b : dst) { + if (sb.length() > 0) + sb.append(':'); + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } else if (this == IANA_IPFIX_TYPES.float32) { + return String.valueOf(ByteBuffer.wrap(dst).getFloat()); + } else if (this == IANA_IPFIX_TYPES.float64) { + return String.valueOf(ByteBuffer.wrap(dst).getDouble()); + } else { + if (bytes != 0) { + // integer number, or date, note we pass a field length instead of using the + // type's length, because IPFIX allows encoding of long types with shorter + // representations + // TOOD - do this properly! + if (date) { + return String.valueOf(bytesToLong(dst, len > 0 ? len : bytes)); + } else { + if (signed) { + return String.valueOf(bytesToLong(dst, len > 0 ? len : bytes)); + } else { + return String.valueOf(bytesToLong(dst, len > 0 ? len : bytes)); + } + + } + } else { + return new String(dst, UTF8_CHARSET); + } + } + } + + private int bytes = 0; + private boolean date = false; + private boolean signed = false; + + private IANA_IPFIX_TYPES(int bytes, boolean signed) { + this.bytes = bytes; + this.signed = signed; + } + + private IANA_IPFIX_TYPES(int bytes, boolean signed, boolean date) { + this.bytes = bytes; + this.signed = signed; + this.date = date; + } + + private IANA_IPFIX_TYPES() { + } + + public boolean isInteger() { + return !date && bytes != 0; + } + + public boolean isFloat() { + return this == float32 || this == float64; + } + + public boolean isDate() { + return date; + } +} \ No newline at end of file diff --git a/NetflowParser.java b/NetflowParser.java new file mode 100644 index 0000000..e948eec --- /dev/null +++ b/NetflowParser.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.netflow; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; + +import org.apache.commons.codec.binary.Hex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetflowParser { + + protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public class NetflowField { + public NetflowField(int type, int len, int en) { + this.type = type; + this.length = len; + this.en = en; + } + + public int type; + public int length; + public int en; + + public String getName() { + if (this.en != 0) { + return IANA_IPFIX.fromCode(type).name() + "_" + String.valueOf(this.en); + } + return IANA_IPFIX.fromCode(type).name(); + } + + @Override + public String toString() { + return String.format("Field [name=%s, type=%d, en=%d, length=%d]", + new Object[] { getName(), type, en, length }); + } + + public String convertToString(byte[] dst) { + IANA_IPFIX iana = IANA_IPFIX.fromCode(type); + return iana.getType().convertToString(dst, length); + } + } + + public class NetflowTemplate { + public NetflowTemplateKey key; + public List fields = new ArrayList<>(); + + public NetflowTemplate(NetflowTemplateKey key) { + this.key = key; + } + + public void add(NetflowField field) { + this.fields.add(field); + } + + @Override + public String toString() { + return String.format("NetflowTemplate [key=%s,fields=%d]", new Object[] { key.id, fields.size() }); + } + } + + public class NetflowTemplateKey { + public NetflowTemplateKey(int sourceId, int domainId, int templateId) { + this.sourceId = sourceId; + this.domainId = domainId; + this.id = templateId; + } + + /** + * The device sending the flow + */ + public int sourceId; + /** + * Observation domain id in IPFIX protocol + */ + public int domainId; + /** + * The template identifier + */ + public int id; + + @Override + public String toString() { + return String.format("NetflowTemplateKey [sourceId=%x, domainId=%x, id=%x]", + new Object[] { sourceId, domainId, id }); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + // result = prime * result + getOuterType().hashCode(); + result = prime * result + id; + result = prime * result + domainId; + result = prime * result + sourceId; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + NetflowTemplateKey other = (NetflowTemplateKey) obj; + if (!getOuterType().equals(other.getOuterType())) + return false; + if (id != other.id) + return false; + if (domainId != other.domainId) + return false; + if (sourceId != other.sourceId) + return false; + return true; + } + + private NetflowParser getOuterType() { + return NetflowParser.this; + } + } + + public class NetflowRecord extends HashMap implements Map { + private static final long serialVersionUID = 1L; + private NetflowTemplate template; + + public NetflowTemplate getTemplate() { + return template; + } + + public void setTemplate(NetflowTemplate template) { + this.template = template; + } + } + + private DataInputStream ins; + private ConcurrentHashMap templates = new ConcurrentHashMap(); + private ConcurrentLinkedQueue records = new ConcurrentLinkedQueue(); + private ConcurrentLinkedQueue readyToReRun = new ConcurrentLinkedQueue(); + private ConcurrentHashMap> heldSets = new ConcurrentHashMap>(); + + // private int fileLength; + private int exportTime; + private int sequenceNumber; + private int domainId; + private int sourceId; + + public NetflowParser() { + super(); + } + + public void setStream(DataInputStream s) throws IOException { + if (this.ins != null) { + this.ins.close(); + } + this.ins = s; + } + + private void parseStream(DataInputStream s) throws IOException { + while (s.available() > 4) { + int fileLength = processHeader(s); + // check this offset calculation to ensure if matches the right header overhead + int processed = 16; + while (processed < fileLength) { + processed += processSet(s); + logger.debug(String.format("Finished set, processed = %d of fileLength = %d", + new Object[] { processed, fileLength })); + } + logger.debug(String.format("Templates: %d, Records: %d in %d bytes", + new Object[] { this.templates.size(), this.records.size(), processed })); + } + } + + public void parse() throws IOException { + if (ins == null) { + throw new IOException("No stream attached to parser"); + } else { + parseStream(ins); + } + // process any packets which have been delayed, but can now be processed because + // their templates are available. + rerun(); + } + + private void rerun() throws IOException { + byte[] replay; + replay = readyToReRun.poll(); + while (replay != null) { + DataInputStream s = new DataInputStream(new ByteArrayInputStream(replay)); + parseStream(s); + replay = readyToReRun.poll(); + } + } + + private int processHeader(DataInputStream s) throws IOException { + int version = s.readUnsignedShort(); + if (version != 10) { + throw new IOException("Invalid file version"); + } + int fileLength = s.readUnsignedShort(); + exportTime = s.readInt(); + sequenceNumber = s.readInt(); + domainId = s.readInt(); + + logger.debug(String.format("Message Headers %d bytes at %d sequence=%d, domain=%d", + new Object[] { fileLength, exportTime, sequenceNumber, domainId })); + return fileLength; + } + + /** + * @return number of bytes processed + */ + private int processSet(DataInputStream s) throws IOException { + int setId = s.readUnsignedShort(); + int length = s.readUnsignedShort(); + + int bytes = length; + + // what type of record are we dealing with? + if (setId == 2) { + // regular template + bytes = processTemplate(s, length, false); + } else if (setId == 3) { + // option template + bytes = processTemplate(s, length, true); + } else if (setId > 255) { + // data packet + NetflowTemplateKey key = new NetflowTemplateKey(sourceId, domainId, setId); + NetflowTemplate template = templates.get(key); + if (template == null) { + logger.error(String.format("Missing template: %s", new Object[] { key })); + // Save the set for later by constructing an IPFIX packet for replay + byte[] replayBytes = new byte[length + 16 + 4]; + ByteBuffer bb = ByteBuffer.wrap(replayBytes); + bb.put((byte) 0x00); + bb.put((byte) 0x0a); + bb.putShort((short) (length + 16 + 4)); + bb.putInt(exportTime); + bb.putInt(sequenceNumber); + bb.putInt(domainId); + + bb.putShort((short) setId); + bb.putShort((short) length); + + byte[] setBytes = new byte[length]; + s.read(setBytes, 0, length); + bb.put(setBytes); + + // store the packet to re-process based on arrival of a template for the given + // domain and template id + if (!heldSets.containsKey(key)) { + heldSets.put(key, new ConcurrentLinkedQueue()); + } + heldSets.get(key).add(bb.array()); + logger.debug(String.format("Holding set for template %x (%d bytes)", + new Object[] { setId, bb.array().length })); + } else { + logger.debug(String.format("Using template %s", new Object[] { template.toString() })); + bytes = processDataRecord(s, template, length); + } + } else { + // strange packet, log content + byte[] b = new byte[length]; + s.read(b, 0, length); + logger.error(String.format("Unexpected reserved set id %d for %d bytes [%s]", + new Object[] { setId, length, Hex.encodeHexString(b) })); + } + logger.debug(String.format("Processed Set %x of %d bytes using %d", new Object[] { setId, length, bytes })); + if (bytes < length) { + s.skip(length + 4 - bytes); + } + return length + 4; + } + + private int processTemplate(DataInputStream s, int length, boolean option) throws IOException { + int bytesProcessed = 4; + while (bytesProcessed < length) { + int templateId = s.readUnsignedShort(); + int fieldCount = s.readUnsignedShort(); + bytesProcessed += 4; + int scopeFieldCount = 0; + if (option) { + scopeFieldCount = s.readUnsignedShort(); + bytesProcessed += 2; + } + NetflowTemplateKey key = new NetflowTemplateKey(sourceId, domainId, templateId); + if (fieldCount == 0 && this.templates.containsKey(key)) { + // template withdrawn + this.templates.remove(key); + } else { + NetflowTemplate template = new NetflowTemplate(key); + for (int i = 0, l = fieldCount; i < l; i++) { + NetflowField field = processField(s); + template.add(field); + bytesProcessed += 4; + if (field.en != 0) { + bytesProcessed += 4; + } + } + + logger.debug(String.format( + "Processed Template %x with %d fields and %d scopeFields (bytesProcessed=%d, length=%d)", + new Object[] { key.id, fieldCount, scopeFieldCount, bytesProcessed, length })); + if (logger.isDebugEnabled()) { + for (NetflowField field : template.fields) { + logger.debug(field.toString()); + } + } + this.templates.put(key, template); + // check for any saved packets that can now be reprocessed + logger.debug( + String.format("Checking heldsets (%d) key: %s", new Object[] { this.heldSets.size(), key })); + + if (this.heldSets.containsKey(key)) { + logger.debug(String.format("Re-running held datasets for template %s", new Object[] { key })); + this.readyToReRun.addAll(this.heldSets.get(key)); + this.heldSets.remove(key); + rerun(); + } + } + } + return bytesProcessed; + } + + private NetflowField processField(DataInputStream s) throws IOException { + int type = s.readUnsignedShort(); + int len = s.readUnsignedShort(); + int en = 0; + if ((type & 0x8000) == 0x8000) { + en = s.readInt(); + } + return new NetflowField((type & 0x7FFF), len, en); + } + + private int processDataRecord(DataInputStream s, NetflowTemplate template, int length) throws IOException { + int bytesProcessed = 4; + while (bytesProcessed < length) { + NetflowRecord record = new NetflowRecord(); + record.setTemplate(template); + for (NetflowField item : template.fields) { + int fieldLength = item.length; + if (fieldLength == 65535) { + // this is a variable length field, which means the first byte is the length of + // the value + fieldLength = s.read(); + if (fieldLength == 255) { + fieldLength = s.readUnsignedShort(); + bytesProcessed += 2; + } + bytesProcessed += 1; + logger.debug(String.format("Custom length found %d of type %d", + new Object[] { fieldLength, item.type })); + } + if (fieldLength<0) {fieldLength=0;} + byte[] dst = new byte[fieldLength]; + s.read(dst, 0, fieldLength); + bytesProcessed += fieldLength; + record.put(item.getName(), item.convertToString(dst)); + } + this.records.add(record); + logger.debug(String.format("Flow Record (template=%x, length=%d, bytes=%d)", + new Object[] { template.key.id, length, bytesProcessed })); + logger.debug(record.toString()); + } + return bytesProcessed; + } + + public Map getTemplates() { + return templates; + } + + public Queue getRecords() { + return records; + } + + /** + * Build a list of all known fields in the templates + * + * @return Set of all the field identifiers across all templates + */ + public Set getAllKnownFields() { + Set fieldTypes = this.templates.values().stream().flatMap(x -> x.fields.stream().map(f -> f.type)) + .collect(Collectors.toSet()); + return fieldTypes; + } + + public void close() throws IOException { + this.ins.close(); + } + + public Set getAllKnownFieldsTypes() { + // get every field, and convert to name + return this.templates.values().stream().map(t -> t.fields).flatMap(List::stream).map(f -> f.getName()) + .collect(Collectors.toSet()); + } + + public void setSourceId(int sourceId) { + this.sourceId = sourceId; + } + +} diff --git a/nifi-netflow-nar-0.0.1.nar b/nifi-netflow-nar-0.0.1.nar new file mode 100644 index 0000000..0062757 Binary files /dev/null and b/nifi-netflow-nar-0.0.1.nar differ diff --git a/nifi-netflow-processors/src/main/java/org/apache/nifi/netflow/IANA_IPFIX_TYPES.java b/nifi-netflow-processors/src/main/java/org/apache/nifi/netflow/IANA_IPFIX_TYPES.java index adb5b93..a6cbde2 100644 --- a/nifi-netflow-processors/src/main/java/org/apache/nifi/netflow/IANA_IPFIX_TYPES.java +++ b/nifi-netflow-processors/src/main/java/org/apache/nifi/netflow/IANA_IPFIX_TYPES.java @@ -57,10 +57,12 @@ public enum IANA_IPFIX_TYPES { public static long bytesToLong(byte[] b, int c) { long result = 0; + if (c!=b.length) {c=b.length;} + if (b.length!=0) { for (int i = 0; i < c; i++) { result <<= 8; result |= (b[i] & 0xFF); - } + }} return result; } @@ -109,8 +111,7 @@ public String convertToString(byte[] dst, int len) { return String.valueOf(bytesToLong(dst, len > 0 ? len : bytes)); } else { if (signed) { - return String.valueOf( - ((dst[0] & 0x8000) == 0x8000 ? -1 : 1) * bytesToLong(dst, len > 0 ? len : bytes)); + return String.valueOf(bytesToLong(dst, len > 0 ? len : bytes)); } else { return String.valueOf(bytesToLong(dst, len > 0 ? len : bytes)); } @@ -151,4 +152,4 @@ public boolean isFloat() { public boolean isDate() { return date; } -} \ No newline at end of file +} diff --git a/nifi-netflow-processors/src/main/java/org/apache/nifi/netflow/NetflowParser.java b/nifi-netflow-processors/src/main/java/org/apache/nifi/netflow/NetflowParser.java index 93725e6..e948eec 100644 --- a/nifi-netflow-processors/src/main/java/org/apache/nifi/netflow/NetflowParser.java +++ b/nifi-netflow-processors/src/main/java/org/apache/nifi/netflow/NetflowParser.java @@ -380,6 +380,7 @@ private int processDataRecord(DataInputStream s, NetflowTemplate template, int l logger.debug(String.format("Custom length found %d of type %d", new Object[] { fieldLength, item.type })); } + if (fieldLength<0) {fieldLength=0;} byte[] dst = new byte[fieldLength]; s.read(dst, 0, fieldLength); bytesProcessed += fieldLength; diff --git a/nifi-netflow.xml b/nifi-netflow.xml new file mode 100644 index 0000000..42ed1e2 --- /dev/null +++ b/nifi-netflow.xml @@ -0,0 +1,519 @@ + +