From 4909f51578ca084efad60aefec2eff1ccf8ba6a8 Mon Sep 17 00:00:00 2001 From: Annapurna Date: Thu, 27 Mar 2014 09:32:17 +0530 Subject: [PATCH] Add data metrics to audit message --- .../com/inmobi/audit/thrift/AuditMessage.java | 294 ++++++++++++- .../com/inmobi/audit/thrift/AuditMetrics.java | 398 ++++++++++++++++++ .../publisher/AbstractMessagePublisher.java | 5 +- .../publisher/AuditCounterAccumulator.java | 65 ++- .../messaging/publisher/AuditService.java | 17 +- .../com/inmobi/messaging/util/AuditUtil.java | 4 +- .../src/main/resources/audit.thrift | 9 +- .../consumer/SampleTestConsumer.java | 5 +- .../messaging/publisher/TestAuditService.java | 2 +- .../messaging/publisher/TestPublisher.java | 40 +- 10 files changed, 797 insertions(+), 42 deletions(-) create mode 100644 messaging-client-core/src/main/java/com/inmobi/audit/thrift/AuditMetrics.java diff --git a/messaging-client-core/src/main/java/com/inmobi/audit/thrift/AuditMessage.java b/messaging-client-core/src/main/java/com/inmobi/audit/thrift/AuditMessage.java index 0b0c9bc6..391495c6 100644 --- a/messaging-client-core/src/main/java/com/inmobi/audit/thrift/AuditMessage.java +++ b/messaging-client-core/src/main/java/com/inmobi/audit/thrift/AuditMessage.java @@ -38,6 +38,8 @@ public class AuditMessage implements TBase, private static final TField SENT_FIELD_DESC = new TField("sent", TType.MAP, (short)7); private static final TField FILENAMES_FIELD_DESC = new TField("filenames", TType.LIST, (short)8); private static final TField TAGS_FIELD_DESC = new TField("tags", TType.MAP, (short)9); + private static final TField RECEIVED_METRICS_FIELD_DESC = new TField("receivedMetrics", TType.MAP, (short)10); + private static final TField SENT_METRICS_FIELD_DESC = new TField("sentMetrics", TType.MAP, (short)11); public long timestamp; public String topic; @@ -48,6 +50,8 @@ public class AuditMessage implements TBase, public Map sent; public List filenames; public Map tags; + public Map receivedMetrics; + public Map sentMetrics; /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements TFieldIdEnum { @@ -59,7 +63,9 @@ public enum _Fields implements TFieldIdEnum { RECEIVED((short)6, "received"), SENT((short)7, "sent"), FILENAMES((short)8, "filenames"), - TAGS((short)9, "tags"); + TAGS((short)9, "tags"), + RECEIVED_METRICS((short)10, "receivedMetrics"), + SENT_METRICS((short)11, "sentMetrics"); private static final Map byName = new HashMap(); @@ -92,6 +98,10 @@ public static _Fields findByThriftId(int fieldId) { return FILENAMES; case 9: // TAGS return TAGS; + case 10: // RECEIVED_METRICS + return RECEIVED_METRICS; + case 11: // SENT_METRICS + return SENT_METRICS; default: return null; } @@ -164,6 +174,14 @@ public String getFieldName() { new MapMetaData(TType.MAP, new FieldValueMetaData(TType.STRING), new FieldValueMetaData(TType.STRING)))); + tmpMap.put(_Fields.RECEIVED_METRICS, new FieldMetaData("receivedMetrics", TFieldRequirementType.DEFAULT, + new MapMetaData(TType.MAP, + new FieldValueMetaData(TType.I64), + new StructMetaData(TType.STRUCT, AuditMetrics.class)))); + tmpMap.put(_Fields.SENT_METRICS, new FieldMetaData("sentMetrics", TFieldRequirementType.DEFAULT, + new MapMetaData(TType.MAP, + new FieldValueMetaData(TType.I64), + new StructMetaData(TType.STRUCT, AuditMetrics.class)))); metaDataMap = Collections.unmodifiableMap(tmpMap); FieldMetaData.addStructMetaDataMap(AuditMessage.class, metaDataMap); } @@ -180,7 +198,9 @@ public AuditMessage( Map received, Map sent, List filenames, - Map tags) + Map tags, + Map receivedMetrics, + Map sentMetrics) { this(); this.timestamp = timestamp; @@ -194,6 +214,8 @@ public AuditMessage( this.sent = sent; this.filenames = filenames; this.tags = tags; + this.receivedMetrics = receivedMetrics; + this.sentMetrics = sentMetrics; } /** @@ -265,6 +287,36 @@ public AuditMessage(AuditMessage other) { } this.tags = __this__tags; } + if (other.isSetReceivedMetrics()) { + Map __this__receivedMetrics = new HashMap(); + for (Map.Entry other_element : other.receivedMetrics.entrySet()) { + + Long other_element_key = other_element.getKey(); + AuditMetrics other_element_value = other_element.getValue(); + + Long __this__receivedMetrics_copy_key = other_element_key; + + AuditMetrics __this__receivedMetrics_copy_value = new AuditMetrics(other_element_value); + + __this__receivedMetrics.put(__this__receivedMetrics_copy_key, __this__receivedMetrics_copy_value); + } + this.receivedMetrics = __this__receivedMetrics; + } + if (other.isSetSentMetrics()) { + Map __this__sentMetrics = new HashMap(); + for (Map.Entry other_element : other.sentMetrics.entrySet()) { + + Long other_element_key = other_element.getKey(); + AuditMetrics other_element_value = other_element.getValue(); + + Long __this__sentMetrics_copy_key = other_element_key; + + AuditMetrics __this__sentMetrics_copy_value = new AuditMetrics(other_element_value); + + __this__sentMetrics.put(__this__sentMetrics_copy_key, __this__sentMetrics_copy_value); + } + this.sentMetrics = __this__sentMetrics; + } } public AuditMessage deepCopy() { @@ -289,6 +341,8 @@ public void clear() { this.sent = null; this.filenames = null; this.tags = null; + this.receivedMetrics = null; + this.sentMetrics = null; } public long getTimestamp() { @@ -553,6 +607,76 @@ public void setTagsIsSet(boolean value) { } } + public int getReceivedMetricsSize() { + return (this.receivedMetrics == null) ? 0 : this.receivedMetrics.size(); + } + + public void putToReceivedMetrics(long key, AuditMetrics val) { + if (this.receivedMetrics == null) { + this.receivedMetrics = new HashMap(); + } + this.receivedMetrics.put(key, val); + } + + public Map getReceivedMetrics() { + return this.receivedMetrics; + } + + public AuditMessage setReceivedMetrics(Map receivedMetrics) { + this.receivedMetrics = receivedMetrics; + return this; + } + + public void unsetReceivedMetrics() { + this.receivedMetrics = null; + } + + /** Returns true if field receivedMetrics is set (has been asigned a value) and false otherwise */ + public boolean isSetReceivedMetrics() { + return this.receivedMetrics != null; + } + + public void setReceivedMetricsIsSet(boolean value) { + if (!value) { + this.receivedMetrics = null; + } + } + + public int getSentMetricsSize() { + return (this.sentMetrics == null) ? 0 : this.sentMetrics.size(); + } + + public void putToSentMetrics(long key, AuditMetrics val) { + if (this.sentMetrics == null) { + this.sentMetrics = new HashMap(); + } + this.sentMetrics.put(key, val); + } + + public Map getSentMetrics() { + return this.sentMetrics; + } + + public AuditMessage setSentMetrics(Map sentMetrics) { + this.sentMetrics = sentMetrics; + return this; + } + + public void unsetSentMetrics() { + this.sentMetrics = null; + } + + /** Returns true if field sentMetrics is set (has been asigned a value) and false otherwise */ + public boolean isSetSentMetrics() { + return this.sentMetrics != null; + } + + public void setSentMetricsIsSet(boolean value) { + if (!value) { + this.sentMetrics = null; + } + } + public void setFieldValue(_Fields field, Object value) { switch (field) { case TIMESTAMP: @@ -627,6 +751,22 @@ public void setFieldValue(_Fields field, Object value) { } break; + case RECEIVED_METRICS: + if (value == null) { + unsetReceivedMetrics(); + } else { + setReceivedMetrics((Map)value); + } + break; + + case SENT_METRICS: + if (value == null) { + unsetSentMetrics(); + } else { + setSentMetrics((Map)value); + } + break; + } } @@ -663,6 +803,12 @@ public Object getFieldValue(_Fields field) { case TAGS: return getTags(); + case RECEIVED_METRICS: + return getReceivedMetrics(); + + case SENT_METRICS: + return getSentMetrics(); + } throw new IllegalStateException(); } @@ -692,6 +838,10 @@ public boolean isSet(_Fields field) { return isSetFilenames(); case TAGS: return isSetTags(); + case RECEIVED_METRICS: + return isSetReceivedMetrics(); + case SENT_METRICS: + return isSetSentMetrics(); } throw new IllegalStateException(); } @@ -794,6 +944,24 @@ public boolean equals(AuditMessage that) { return false; } + boolean this_present_receivedMetrics = true && this.isSetReceivedMetrics(); + boolean that_present_receivedMetrics = true && that.isSetReceivedMetrics(); + if (this_present_receivedMetrics || that_present_receivedMetrics) { + if (!(this_present_receivedMetrics && that_present_receivedMetrics)) + return false; + if (!this.receivedMetrics.equals(that.receivedMetrics)) + return false; + } + + boolean this_present_sentMetrics = true && this.isSetSentMetrics(); + boolean that_present_sentMetrics = true && that.isSetSentMetrics(); + if (this_present_sentMetrics || that_present_sentMetrics) { + if (!(this_present_sentMetrics && that_present_sentMetrics)) + return false; + if (!this.sentMetrics.equals(that.sentMetrics)) + return false; + } + return true; } @@ -891,6 +1059,24 @@ public int compareTo(AuditMessage other) { return lastComparison; } } + lastComparison = Boolean.valueOf(isSetReceivedMetrics()).compareTo(typedOther.isSetReceivedMetrics()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetReceivedMetrics()) { lastComparison = TBaseHelper.compareTo(this.receivedMetrics, typedOther.receivedMetrics); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetSentMetrics()).compareTo(typedOther.isSetSentMetrics()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSentMetrics()) { lastComparison = TBaseHelper.compareTo(this.sentMetrics, typedOther.sentMetrics); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1015,6 +1201,46 @@ public void read(TProtocol iprot) throws TException { TProtocolUtil.skip(iprot, field.type); } break; + case 10: // RECEIVED_METRICS + if (field.type == TType.MAP) { + { + TMap _map15 = iprot.readMapBegin(); + this.receivedMetrics = new HashMap(2*_map15.size); + for (int _i16 = 0; _i16 < _map15.size; ++_i16) + { + long _key17; + AuditMetrics _val18; + _key17 = iprot.readI64(); + _val18 = new AuditMetrics(); + _val18.read(iprot); + this.receivedMetrics.put(_key17, _val18); + } + iprot.readMapEnd(); + } + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case 11: // SENT_METRICS + if (field.type == TType.MAP) { + { + TMap _map19 = iprot.readMapBegin(); + this.sentMetrics = new HashMap(2*_map19.size); + for (int _i20 = 0; _i20 < _map19.size; ++_i20) + { + long _key21; + AuditMetrics _val22; + _key21 = iprot.readI64(); + _val22 = new AuditMetrics(); + _val22.read(iprot); + this.sentMetrics.put(_key21, _val22); + } + iprot.readMapEnd(); + } + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; default: TProtocolUtil.skip(iprot, field.type); } @@ -1055,10 +1281,10 @@ public void write(TProtocol oprot) throws TException { oprot.writeFieldBegin(RECEIVED_FIELD_DESC); { oprot.writeMapBegin(new TMap(TType.I64, TType.I64, this.received.size())); - for (Map.Entry _iter15 : this.received.entrySet()) + for (Map.Entry _iter23 : this.received.entrySet()) { - oprot.writeI64(_iter15.getKey()); - oprot.writeI64(_iter15.getValue()); + oprot.writeI64(_iter23.getKey()); + oprot.writeI64(_iter23.getValue()); } oprot.writeMapEnd(); } @@ -1068,10 +1294,10 @@ public void write(TProtocol oprot) throws TException { oprot.writeFieldBegin(SENT_FIELD_DESC); { oprot.writeMapBegin(new TMap(TType.I64, TType.I64, this.sent.size())); - for (Map.Entry _iter16 : this.sent.entrySet()) + for (Map.Entry _iter24 : this.sent.entrySet()) { - oprot.writeI64(_iter16.getKey()); - oprot.writeI64(_iter16.getValue()); + oprot.writeI64(_iter24.getKey()); + oprot.writeI64(_iter24.getValue()); } oprot.writeMapEnd(); } @@ -1081,9 +1307,9 @@ public void write(TProtocol oprot) throws TException { oprot.writeFieldBegin(FILENAMES_FIELD_DESC); { oprot.writeListBegin(new TList(TType.STRING, this.filenames.size())); - for (String _iter17 : this.filenames) + for (String _iter25 : this.filenames) { - oprot.writeString(_iter17); + oprot.writeString(_iter25); } oprot.writeListEnd(); } @@ -1093,10 +1319,36 @@ public void write(TProtocol oprot) throws TException { oprot.writeFieldBegin(TAGS_FIELD_DESC); { oprot.writeMapBegin(new TMap(TType.STRING, TType.STRING, this.tags.size())); - for (Map.Entry _iter18 : this.tags.entrySet()) + for (Map.Entry _iter26 : this.tags.entrySet()) { - oprot.writeString(_iter18.getKey()); - oprot.writeString(_iter18.getValue()); + oprot.writeString(_iter26.getKey()); + oprot.writeString(_iter26.getValue()); + } + oprot.writeMapEnd(); + } + oprot.writeFieldEnd(); + } + if (this.receivedMetrics != null) { + oprot.writeFieldBegin(RECEIVED_METRICS_FIELD_DESC); + { + oprot.writeMapBegin(new TMap(TType.I64, TType.STRUCT, this.receivedMetrics.size())); + for (Map.Entry _iter27 : this.receivedMetrics.entrySet()) + { + oprot.writeI64(_iter27.getKey()); + _iter27.getValue().write(oprot); + } + oprot.writeMapEnd(); + } + oprot.writeFieldEnd(); + } + if (this.sentMetrics != null) { + oprot.writeFieldBegin(SENT_METRICS_FIELD_DESC); + { + oprot.writeMapBegin(new TMap(TType.I64, TType.STRUCT, this.sentMetrics.size())); + for (Map.Entry _iter28 : this.sentMetrics.entrySet()) + { + oprot.writeI64(_iter28.getKey()); + _iter28.getValue().write(oprot); } oprot.writeMapEnd(); } @@ -1174,6 +1426,22 @@ public String toString() { sb.append(this.tags); } first = false; + if (!first) sb.append(", "); + sb.append("receivedMetrics:"); + if (this.receivedMetrics == null) { + sb.append("null"); + } else { + sb.append(this.receivedMetrics); + } + first = false; + if (!first) sb.append(", "); + sb.append("sentMetrics:"); + if (this.sentMetrics == null) { + sb.append("null"); + } else { + sb.append(this.sentMetrics); + } + first = false; sb.append(")"); return sb.toString(); } diff --git a/messaging-client-core/src/main/java/com/inmobi/audit/thrift/AuditMetrics.java b/messaging-client-core/src/main/java/com/inmobi/audit/thrift/AuditMetrics.java new file mode 100644 index 00000000..971cdaeb --- /dev/null +++ b/messaging-client-core/src/main/java/com/inmobi/audit/thrift/AuditMetrics.java @@ -0,0 +1,398 @@ +/** + * Autogenerated by Thrift + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + */ +package com.inmobi.audit.thrift; + +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.thrift.*; +import org.apache.thrift.async.*; +import org.apache.thrift.meta_data.*; +import org.apache.thrift.transport.*; +import org.apache.thrift.protocol.*; + +public class AuditMetrics implements TBase, java.io.Serializable, Cloneable { + private static final TStruct STRUCT_DESC = new TStruct("AuditMetrics"); + + private static final TField COUNT_FIELD_DESC = new TField("count", TType.I64, (short)1); + private static final TField SIZE_FIELD_DESC = new TField("size", TType.I64, (short)2); + + public long count; + public long size; + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements TFieldIdEnum { + COUNT((short)1, "count"), + SIZE((short)2, "size"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // COUNT + return COUNT; + case 2: // SIZE + return SIZE; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __COUNT_ISSET_ID = 0; + private static final int __SIZE_ISSET_ID = 1; + private BitSet __isset_bit_vector = new BitSet(2); + + public static final Map<_Fields, FieldMetaData> metaDataMap; + static { + Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.COUNT, new FieldMetaData("count", TFieldRequirementType.DEFAULT, + new FieldValueMetaData(TType.I64))); + tmpMap.put(_Fields.SIZE, new FieldMetaData("size", TFieldRequirementType.DEFAULT, + new FieldValueMetaData(TType.I64))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + FieldMetaData.addStructMetaDataMap(AuditMetrics.class, metaDataMap); + } + + public AuditMetrics() { + } + + public AuditMetrics( + long count, + long size) + { + this(); + this.count = count; + setCountIsSet(true); + this.size = size; + setSizeIsSet(true); + } + + /** + * Performs a deep copy on other. + */ + public AuditMetrics(AuditMetrics other) { + __isset_bit_vector.clear(); + __isset_bit_vector.or(other.__isset_bit_vector); + this.count = other.count; + this.size = other.size; + } + + public AuditMetrics deepCopy() { + return new AuditMetrics(this); + } + + @Deprecated + public AuditMetrics clone() { + return new AuditMetrics(this); + } + + @Override + public void clear() { + setCountIsSet(false); + this.count = 0; + setSizeIsSet(false); + this.size = 0; + } + + public long getCount() { + return this.count; + } + + public AuditMetrics setCount(long count) { + this.count = count; + setCountIsSet(true); + return this; + } + + public void unsetCount() { + __isset_bit_vector.clear(__COUNT_ISSET_ID); + } + + /** Returns true if field count is set (has been asigned a value) and false otherwise */ + public boolean isSetCount() { + return __isset_bit_vector.get(__COUNT_ISSET_ID); + } + + public void setCountIsSet(boolean value) { + __isset_bit_vector.set(__COUNT_ISSET_ID, value); + } + + public long getSize() { + return this.size; + } + + public AuditMetrics setSize(long size) { + this.size = size; + setSizeIsSet(true); + return this; + } + + public void unsetSize() { + __isset_bit_vector.clear(__SIZE_ISSET_ID); + } + + /** Returns true if field size is set (has been asigned a value) and false otherwise */ + public boolean isSetSize() { + return __isset_bit_vector.get(__SIZE_ISSET_ID); + } + + public void setSizeIsSet(boolean value) { + __isset_bit_vector.set(__SIZE_ISSET_ID, value); + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case COUNT: + if (value == null) { + unsetCount(); + } else { + setCount((Long)value); + } + break; + + case SIZE: + if (value == null) { + unsetSize(); + } else { + setSize((Long)value); + } + break; + + } + } + + public void setFieldValue(int fieldID, Object value) { + setFieldValue(_Fields.findByThriftIdOrThrow(fieldID), value); + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case COUNT: + return new Long(getCount()); + + case SIZE: + return new Long(getSize()); + + } + throw new IllegalStateException(); + } + + public Object getFieldValue(int fieldId) { + return getFieldValue(_Fields.findByThriftIdOrThrow(fieldId)); + } + + /** Returns true if field corresponding to fieldID is set (has been asigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + switch (field) { + case COUNT: + return isSetCount(); + case SIZE: + return isSetSize(); + } + throw new IllegalStateException(); + } + + public boolean isSet(int fieldID) { + return isSet(_Fields.findByThriftIdOrThrow(fieldID)); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof AuditMetrics) + return this.equals((AuditMetrics)that); + return false; + } + + public boolean equals(AuditMetrics that) { + if (that == null) + return false; + + boolean this_present_count = true; + boolean that_present_count = true; + if (this_present_count || that_present_count) { + if (!(this_present_count && that_present_count)) + return false; + if (this.count != that.count) + return false; + } + + boolean this_present_size = true; + boolean that_present_size = true; + if (this_present_size || that_present_size) { + if (!(this_present_size && that_present_size)) + return false; + if (this.size != that.size) + return false; + } + + return true; + } + + @Override + public int hashCode() { + return 0; + } + + public int compareTo(AuditMetrics other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + AuditMetrics typedOther = (AuditMetrics)other; + + lastComparison = Boolean.valueOf(isSetCount()).compareTo(typedOther.isSetCount()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCount()) { lastComparison = TBaseHelper.compareTo(this.count, typedOther.count); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetSize()).compareTo(typedOther.isSetSize()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSize()) { lastComparison = TBaseHelper.compareTo(this.size, typedOther.size); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public void read(TProtocol iprot) throws TException { + TField field; + iprot.readStructBegin(); + while (true) + { + field = iprot.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + switch (field.id) { + case 1: // COUNT + if (field.type == TType.I64) { + this.count = iprot.readI64(); + setCountIsSet(true); + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + case 2: // SIZE + if (field.type == TType.I64) { + this.size = iprot.readI64(); + setSizeIsSet(true); + } else { + TProtocolUtil.skip(iprot, field.type); + } + break; + default: + TProtocolUtil.skip(iprot, field.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + validate(); + } + + public void write(TProtocol oprot) throws TException { + validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldBegin(COUNT_FIELD_DESC); + oprot.writeI64(this.count); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(SIZE_FIELD_DESC); + oprot.writeI64(this.size); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("AuditMetrics("); + boolean first = true; + + sb.append("count:"); + sb.append(this.count); + first = false; + if (!first) sb.append(", "); + sb.append("size:"); + sb.append(this.size); + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws TException { + // check for required fields + } + +} + diff --git a/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AbstractMessagePublisher.java b/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AbstractMessagePublisher.java index 46da4e28..f0be9b4d 100644 --- a/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AbstractMessagePublisher.java +++ b/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AbstractMessagePublisher.java @@ -56,11 +56,12 @@ public void publish(String topicName, Message m) { void publish(String topicName, Message m, boolean isPublishedByAuditService) { Long timestamp = null; + Long messageLength = 0l; if (!isPublishedByAuditService && isAuditEnabled) { // Add timstamp to the message timestamp = new Date().getTime(); + messageLength = Long.valueOf(m.getData().array().length); AuditUtil.attachHeaders(m, timestamp); - } // initialization should happen only by one thread synchronized (this) { @@ -71,7 +72,7 @@ void publish(String topicName, Message m, getStats(topicName).accumulateInvocation(); initTopic(topicName, getStats(topicName)); if (!isPublishedByAuditService && isAuditEnabled) { - auditService.incrementReceived(topicName, timestamp); + auditService.incrementReceived(topicName, timestamp, messageLength); } } // TODO: generate headers diff --git a/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AuditCounterAccumulator.java b/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AuditCounterAccumulator.java index 93773ca6..b6f946f4 100644 --- a/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AuditCounterAccumulator.java +++ b/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AuditCounterAccumulator.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; +import com.inmobi.audit.thrift.AuditMetrics; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -17,12 +18,16 @@ public class AuditCounterAccumulator { private static final Log LOG = LogFactory. getLog(AuditCounterAccumulator.class); private Counters counters = new Counters(new HashMap(), - new HashMap()); + new HashMap(), new HashMap(), new HashMap()); private int windowSize; class Counters { + //Map received and sent will not be removed till feeder + // change private HashMap received; private HashMap sent; + private HashMap receivedMetrics; + private HashMap sentMetrics; public Map getReceived() { return received; @@ -32,14 +37,31 @@ public Map getSent() { return sent; } - Counters(HashMap received, HashMap sent) { + public Map getReceivedMetrics() { + return receivedMetrics; + } + + public Map getSentMetrics() { + return sentMetrics; + } + + Counters(HashMap received, HashMap sent, + HashMap receivedMetrics, HashMap sentMetrics) { + this(receivedMetrics, sentMetrics); this.received = received; this.sent = sent; } + + Counters(HashMap receivedMetrics, HashMap sentMetrics) { + this.receivedMetrics = receivedMetrics; + this.sentMetrics = sentMetrics; + } + } AuditCounterAccumulator(int windowSize) { - this.windowSize = windowSize; } @@ -48,28 +70,55 @@ private Long getWindow(Long timestamp) { return window; } - void incrementReceived(Long timestamp) { + void incrementReceived(Long timestamp, Long messageLength) { Long window = getWindow(timestamp); if (!counters.received.containsKey(window)) { - counters.received.put(window, Long.valueOf(0)); + counters.received.put(window, (long) 0); } counters.received.put(window, counters.received.get(window) + 1); + + AuditMetrics metrics; + if (!counters.receivedMetrics.containsKey(window)) { + metrics = new AuditMetrics(0, 0); + } else { + metrics = counters.receivedMetrics.get(window); + } + long newCount = metrics.getCount() + 1; + long newSize = metrics.getSize() + messageLength; + metrics.setCount(newCount); + metrics.setSize(newSize); + counters.receivedMetrics.put(window, metrics); } - void incrementSent(Long timestamp) { + void incrementSent(Long timestamp, Long messageLength) { Long window = getWindow(timestamp); if (!counters.sent.containsKey(window)) { - counters.sent.put(window, Long.valueOf(0)); + counters.sent.put(window, (long) 0); } counters.sent.put(window, counters.sent.get(window) + 1); + AuditMetrics metrics; + if (!counters.sentMetrics.containsKey(window)) { + metrics = new AuditMetrics(0, 0); + } else { + metrics = counters.sentMetrics.get(window); + } + long newCount = metrics.getCount() + 1; + long newSize = metrics.getSize() + messageLength; + metrics.setCount(newCount); + metrics.setSize(newSize); + counters.sentMetrics.put(window, metrics); + } Counters getAndReset() { Counters returnValue; - returnValue = new Counters(counters.received, counters.sent); + returnValue = new Counters(counters.received, counters.sent, + counters.receivedMetrics, counters.sentMetrics); counters.received = new HashMap(); counters.sent = new HashMap(); + counters.receivedMetrics = new HashMap(); + counters.sentMetrics = new HashMap(); return returnValue; } } diff --git a/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AuditService.java b/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AuditService.java index fd5dc12e..2f617210 100644 --- a/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AuditService.java +++ b/messaging-client-core/src/main/java/com/inmobi/messaging/publisher/AuditService.java @@ -37,6 +37,7 @@ class AuditService { final HashMap topicAccumulatorMap = new HashMap(); private final String tier = "publisher"; + public static final String AUDIT_VERSION_TAG = "auditVersion"; private ScheduledThreadPoolExecutor executor; private boolean isInit = false; private AuditWorker worker; @@ -72,8 +73,7 @@ public void run() { + " 0"); continue; } - AuditMessage packet = - createPacket(topic, counters.getReceived(), counters.getSent()); + AuditMessage packet = createPacket(topic, counters); publishPacket(packet); } @@ -96,12 +96,14 @@ private void publishPacket(AuditMessage packet) { } } - private AuditMessage createPacket(String topic, Map received, - Map sent) { + private AuditMessage createPacket(String topic, Counters counters) { long currentTime = new Date().getTime(); + Map tags = new HashMap(); + tags.put(AUDIT_VERSION_TAG, String.valueOf(AuditUtil.currentVersion)); AuditMessage packet = new AuditMessage(currentTime, topic, tier, hostname, windowSize, - received, sent, null, null); + counters.getReceived(), counters.getSent(), null, tags, + counters.getReceivedMetrics(), counters.getSentMetrics()); return packet; } @@ -161,8 +163,9 @@ void close() { } } - void incrementReceived(String topicName, Long timestamp) { + void incrementReceived(String topicName, Long timestamp, + Long messageLength) { AuditCounterAccumulator accumulator = getAccumulator(topicName); - accumulator.incrementReceived(timestamp); + accumulator.incrementReceived(timestamp, messageLength); } } diff --git a/messaging-client-core/src/main/java/com/inmobi/messaging/util/AuditUtil.java b/messaging-client-core/src/main/java/com/inmobi/messaging/util/AuditUtil.java index 4fb9ee28..fdb36cbd 100644 --- a/messaging-client-core/src/main/java/com/inmobi/messaging/util/AuditUtil.java +++ b/messaging-client-core/src/main/java/com/inmobi/messaging/util/AuditUtil.java @@ -11,8 +11,8 @@ public class AuditUtil { static final byte[] magicBytes = {(byte) 0xAB, (byte) 0xCD, (byte) 0xEF }; public static final String AUDIT_STREAM_TOPIC_NAME = "_audit"; - private static final byte[] versions = {1}; - private static final int currentVersion = 1; + private static final byte[] versions = {1, 2}; + public static final int currentVersion = 2; private static final Log LOG = LogFactory.getLog(AuditUtil.class); public static final int HEADER_LENGTH = 16; private static final long BASE_TIME = 1356998400000L; diff --git a/messaging-client-core/src/main/resources/audit.thrift b/messaging-client-core/src/main/resources/audit.thrift index 94397247..d086e6bb 100644 --- a/messaging-client-core/src/main/resources/audit.thrift +++ b/messaging-client-core/src/main/resources/audit.thrift @@ -3,6 +3,11 @@ namespace java com.inmobi.audit.thrift namespace cpp audit.thrift +struct AuditMetrics { + 1: i64 count, + 2: i64 size +} + struct AuditMessage { 1: i64 timestamp, @@ -13,5 +18,7 @@ struct AuditMessage 6: map received, 7: map sent, 8: list filenames, - 9: map tags + 9: map tags, + 10: map receivedMetrics, + 11: map sentMetrics } \ No newline at end of file diff --git a/messaging-client-core/src/test/java/com/inmobi/messaging/consumer/SampleTestConsumer.java b/messaging-client-core/src/test/java/com/inmobi/messaging/consumer/SampleTestConsumer.java index 65225e59..84fa21cd 100644 --- a/messaging-client-core/src/test/java/com/inmobi/messaging/consumer/SampleTestConsumer.java +++ b/messaging-client-core/src/test/java/com/inmobi/messaging/consumer/SampleTestConsumer.java @@ -9,6 +9,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import com.inmobi.audit.thrift.AuditMetrics; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; @@ -23,12 +24,14 @@ public class SampleTestConsumer extends MockConsumer { private List createData() throws IOException, TException { Map received = new HashMap(); + Map receivedMetrics = new HashMap(); long time = System.currentTimeMillis() - 60000; long window = time - time % 60000; received.put(window, 100L); + receivedMetrics.put(window, new AuditMetrics(100L, 1000L)); AuditMessage packet = new AuditMessage(System.currentTimeMillis(), "testTopic", "publisher", "localhost", 1, received, received, null, - null); + null, receivedMetrics, receivedMetrics); TSerializer serializer = new TSerializer(); // serializer.serialize(packet); byte[] output = serializer.serialize(packet); diff --git a/messaging-client-core/src/test/java/com/inmobi/messaging/publisher/TestAuditService.java b/messaging-client-core/src/test/java/com/inmobi/messaging/publisher/TestAuditService.java index d8104ea4..592e5034 100644 --- a/messaging-client-core/src/test/java/com/inmobi/messaging/publisher/TestAuditService.java +++ b/messaging-client-core/src/test/java/com/inmobi/messaging/publisher/TestAuditService.java @@ -19,7 +19,7 @@ public void testAttachHeaders() { ByteBuffer buffer = m.getData(); buffer.rewind(); int version = buffer.get(); - assert (version == 1); + assert (version == 2); byte[] bytes = new byte[3]; buffer.get(bytes); assert (bytes[0] == magicbytes[0]); diff --git a/messaging-client-core/src/test/java/com/inmobi/messaging/publisher/TestPublisher.java b/messaging-client-core/src/test/java/com/inmobi/messaging/publisher/TestPublisher.java index c06f83e2..e07f3e63 100644 --- a/messaging-client-core/src/test/java/com/inmobi/messaging/publisher/TestPublisher.java +++ b/messaging-client-core/src/test/java/com/inmobi/messaging/publisher/TestPublisher.java @@ -6,6 +6,7 @@ import java.util.Collection; import java.util.concurrent.CountDownLatch; +import com.inmobi.audit.thrift.AuditMetrics; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; import org.testng.Assert; @@ -127,22 +128,47 @@ public void testAuditMessage() throws IOException, InterruptedException, MessagePublisherFactory.create(conf, MockInMemoryPublisher.class.getName()); publisher.publish("topic", new Message("message".getBytes())); + publisher.publish("topic", new Message("message2".getBytes())); + publisher.publish("topic1", new Message("message".getBytes())); publisher.close(); + conf.set("topic.name", AuditUtil.AUDIT_STREAM_TOPIC_NAME); conf.set("consumer.name", "c1"); + MessageConsumer consumer = MessageConsumerFactory.create(conf, MockInMemoryConsumer.class.getName()); ((MockInMemoryConsumer) consumer) .setSource(((MockInMemoryPublisher) (publisher)).source); - Message m = consumer.next(); - TDeserializer deserializer = new TDeserializer(); - AuditMessage audit = new AuditMessage(); - deserializer.deserialize(audit, m.getData().array()); - Collection values = audit.getReceived().values(); - assert (values.iterator().hasNext()); - assert (values.iterator().next() == 1); + int numAuditMessages = 2; + int index = 0; + while (index < numAuditMessages) { + Message m = consumer.next(); + TDeserializer deserializer = new TDeserializer(); + AuditMessage audit = new AuditMessage(); + deserializer.deserialize(audit, m.getData().array()); + Collection values = audit.getReceived().values(); + assert (values.iterator().hasNext()); + assert (audit.getReceivedMetrics().size() == 1); + AuditMetrics topicMetrics = audit.getReceivedMetrics().values().iterator + ().next(); + if (audit.getTopic().equals("topic")) { + assert (values.iterator().next() == 2); + assert (topicMetrics.getCount() == 2); + assert (topicMetrics.getSize() == ("message".getBytes().length + + "message2".getBytes().length)); + } else if (audit.getTopic().equals("topic1")) { + assert (values.iterator().next() == 1); + assert (topicMetrics.getCount() == 1); + assert (topicMetrics.getSize() == "message".getBytes().length); + } + assert (audit.getTags() != null); + assert (audit.getTags().size() == 1); + assert (audit.getTags().get(AuditService.AUDIT_VERSION_TAG).equals(String + .valueOf(AuditUtil.currentVersion))); + index++; + } } @Test