Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions jpos/src/main/java/org/jpos/util/TPS.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,16 @@
@SuppressWarnings("unused")
public class TPS implements Loggeable {
AtomicInteger count;
Instant start;
Duration start;
AtomicLong readings;
int peak;
AtomicInteger peak;
Instant peakWhen;
static final long FROM_NANOS = 1000000L;
Duration period;
float tps;
float avg;
Timer timer;
boolean autoupdate;
protected long simulatedNanoTime = 0L;
protected Duration simulatedNanoTime = Duration.ZERO;

public TPS() {
this(1000L, false);
Expand All @@ -78,11 +77,21 @@ public TPS(boolean autoupdate) {
* @param autoupdate true to autoupdate
*/
public TPS(final long period, boolean autoupdate) {
this(Duration.ofMillis(period), autoupdate);
}

/**
* @param period as a duration
* @param autoupdate true to autoupdate
*/
public TPS(Duration period, boolean autoupdate) {
super();
count = new AtomicInteger(0);
start = peakWhen = Instant.now();
start = Duration.ofNanos(System.nanoTime());
peak = new AtomicInteger(0);
peakWhen = Instant.now();
readings = new AtomicLong(0L);
this.period = Duration.ofMillis(period);
this.period = period;
this.autoupdate = autoupdate;
if (autoupdate) {
timer = new Timer();
Expand All @@ -91,7 +100,7 @@ public TPS(final long period, boolean autoupdate) {
public void run() {
calcTPS(period);
}
}, period, period);
}, period.toMillis(), period.toMillis());
}
}

Expand All @@ -112,7 +121,7 @@ public float getAvg() {
}

public int getPeak() {
return peak;
return peak.get();
}

public long getPeakWhen() {
Expand All @@ -125,7 +134,7 @@ public long getPeakWhen() {
public void reset() {
synchronized(this) {
avg = 0f;
peak = 0;
peak.set(0);
peakWhen = Instant.EPOCH;
readings.set(0L);
}
Expand All @@ -136,7 +145,7 @@ public long getPeriod() {
}

public long getElapsed() {
return Duration.between(start, Instant.now()).toMillis();
return Duration.ofNanos(System.nanoTime()).minus(start).toMillis();
}

public String toString() {
Expand All @@ -158,14 +167,10 @@ public void dump(PrintStream p, String indent) {
p.println(indent
+ "<tps"
+ (autoupdate ? " auto='true'>" : ">")
+ this.toString()
+ this
+ "</tps>");
}

private float calcTPS(long interval) {
return calcTPS(Duration.ofMillis(interval));
}

private float calcTPS(Duration interval) {
synchronized(this) {
tps = (float) period.toNanos() * count.get() / interval.toNanos();
Expand All @@ -174,8 +179,8 @@ private float calcTPS(Duration interval) {
}
long r = readings.getAndIncrement();
avg = (r * avg + tps) / ++r;
if (tps > peak) {
peak = Math.round(tps);
if (tps > peak.get()) {
peak.set(Math.round(tps));
peakWhen = Instant.now();
}
count.set(0);
Expand All @@ -185,8 +190,8 @@ private float calcTPS(Duration interval) {

private float calcTPS() {
synchronized(this) {
Instant now = Instant.now();
Duration interval = Duration.between(start, now);
Duration now = Duration.ofNanos(System.nanoTime());
Duration interval = now.minus(start);
if (interval.compareTo(period) >= 0) {
calcTPS(interval);
start = now;
Expand All @@ -196,13 +201,13 @@ private float calcTPS() {
}

public void setSimulatedNanoTime(long simulatedNanoTime) {
if (this.simulatedNanoTime == 0L)
start = Instant.ofEpochMilli(simulatedNanoTime / FROM_NANOS);
if (this.simulatedNanoTime.equals(Duration.ZERO))
start = Duration.ofNanos(simulatedNanoTime);

this.simulatedNanoTime = simulatedNanoTime;
this.simulatedNanoTime = Duration.ofNanos(simulatedNanoTime);
}

protected long getNanoTime() {
return simulatedNanoTime > 0L ? simulatedNanoTime : System.nanoTime();
return simulatedNanoTime.equals(Duration.ZERO) ? Duration.ofNanos(System.nanoTime()).toNanos() : simulatedNanoTime.toNanos();
}
}
67 changes: 34 additions & 33 deletions jpos/src/test/java/org/jpos/space/TSpacePerformanceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -40,20 +39,19 @@
*
* @author Robert Demski
*/
@SuppressWarnings("unchecked")
public class TSpacePerformanceTest {

LocalSpace<String,Object> sp1;
LocalSpace<String,Object> sp2;
List<Long> t1 = new ArrayList();
List<Long> t2 = new ArrayList();
final List<Long> t1 = new ArrayList<>();
final List<Long> t2 = new ArrayList<>();
// List t1 = Collections.synchronizedCollection(new ArrayList());
public static final int COUNT = 100000;
TPS tpsOut = new TPS(100L, false);
TPS tpsIn = new TPS(100L, false);
final TPS tpsOut = new TPS(100L, false);
final TPS tpsIn = new TPS(100L, false);

class WriteSpaceTask implements Runnable {
String key;
final String key;

WriteSpaceTask(String key){
this.key = key;
Expand All @@ -66,12 +64,12 @@ public void run (){
}
long stamp2 = System.nanoTime();
t1.add(stamp2-stamp);
System.err.println("Write "+key+" out: "+(stamp2-stamp)/1000000 + " " + tpsOut.toString());
System.err.println("Write "+key+" out: "+(stamp2-stamp)/1000000 + " " + tpsOut);
}
}

class ReadSpaceTask implements Runnable {
String key;
final String key;

ReadSpaceTask(String key){
this.key = key;
Expand All @@ -84,18 +82,17 @@ public void run (){
}
long stamp2 = System.nanoTime();
t2.add(stamp2-stamp);
System.err.println("Read "+key+" in: "+(stamp2-stamp)/1000000 + " " + tpsIn.toString());
System.err.println("Read "+key+" in: "+(stamp2-stamp)/1000000 + " " + tpsIn);
}
}

@SuppressWarnings("unchecked")
class WriteSpaceWithNotifyTask implements Runnable, SpaceListener<String,Object> {
String key;
LocalSpace sp1;
LocalSpace sp2;
final String key;
final LocalSpace<String,Object> sp1;
final LocalSpace<String,Object> sp2;
int count = 0;

WriteSpaceWithNotifyTask(String key, LocalSpace sp1, LocalSpace sp2){
WriteSpaceWithNotifyTask(String key, LocalSpace<String,Object> sp1, LocalSpace<String,Object> sp2){
this.key = key;
this.sp1 = sp1;
this.sp2 = sp2;
Expand All @@ -118,7 +115,7 @@ public void notify(String key, Object value) {
}

class WriteSpaceWithNotifyReadTask implements Runnable, SpaceListener<String,Object> {
String key;
final String key;

WriteSpaceWithNotifyReadTask(String key){
this.key = key;
Expand All @@ -136,8 +133,8 @@ public void notify(String key, Object value) {

@BeforeEach
public void setUp () {
sp1 = new TSpace<String,Object>();
sp2 = new TSpace<String,Object>();
sp1 = new TSpace<>();
sp2 = new TSpace<>();
t1.clear();
t2.clear();
}
Expand All @@ -154,11 +151,11 @@ private void printAvg(List<Long> times, String prefix){
}

@Test
public void testReadPerformance() throws Throwable {
public void testReadPerformance() throws InterruptedException {
int size = 10;
ExecutorService es = new ThreadPoolExecutor(size, Integer.MAX_VALUE,
30, TimeUnit.SECONDS, new SynchronousQueue());
((ThreadPoolExecutor)es).prestartAllCoreThreads();
ThreadPoolExecutor es = new ThreadPoolExecutor(size, Integer.MAX_VALUE,
30, TimeUnit.SECONDS, new SynchronousQueue<>());
es.prestartAllCoreThreads();

for (int i=0; i<size; i++)
es.execute(new WriteSpaceTask("PerformTask-"+i));
Expand All @@ -170,22 +167,24 @@ public void testReadPerformance() throws Throwable {
ISOUtil.sleep(500);
es.shutdown();
printAvg(t2, "Avg. read : ");
if (!es.awaitTermination(5, TimeUnit.SECONDS))
fail("Failed to shutdown ThreadPoolExecutor.");
}

@Test
public void testDeadLockWithNotify() throws Throwable {
public void testDeadLockWithNotify() throws InterruptedException {
int size = 10;
final ExecutorService es = new ThreadPoolExecutor(size*2, Integer.MAX_VALUE,
30, TimeUnit.SECONDS, new SynchronousQueue());
((ThreadPoolExecutor)es).prestartAllCoreThreads();
final ThreadPoolExecutor es = new ThreadPoolExecutor(size*2, Integer.MAX_VALUE,
30, TimeUnit.SECONDS, new SynchronousQueue<>());
es.prestartAllCoreThreads();

for (int i=0; i<size; i++)
es.execute(new WriteSpaceWithNotifyTask("WriteTask1-"+i,sp1,sp2));
for (int i=0; i<size; i++)
es.execute(new WriteSpaceWithNotifyTask("WriteTask2-"+i,sp2,sp1));

Instant stamp = Instant.now();
while (((ThreadPoolExecutor)es).getActiveCount() > 0) {
while (es.getActiveCount() > 0) {
if (Duration.between(stamp, Instant.now()).toMillis() < 10000){
ISOUtil.sleep(100);
continue;
Expand All @@ -199,16 +198,17 @@ public void testDeadLockWithNotify() throws Throwable {

// es.shutdown();
es.shutdownNow();
es.awaitTermination(5, TimeUnit.SECONDS);
if (!es.awaitTermination(5, TimeUnit.SECONDS))
fail("Failed to shutdown ThreadPoolExecutor.");
}

@Disabled("Remove it when TSpace can pass it")
@Test
public void testStolenEntryAtNotify() throws Throwable {
public void testStolenEntryAtNotify() throws InterruptedException {
int size = 10;
final ExecutorService es = new ThreadPoolExecutor(size*2, Integer.MAX_VALUE,
30, TimeUnit.SECONDS, new SynchronousQueue());
((ThreadPoolExecutor)es).prestartAllCoreThreads();
final ThreadPoolExecutor es = new ThreadPoolExecutor(size*2, Integer.MAX_VALUE,
30, TimeUnit.SECONDS, new SynchronousQueue<>());
es.prestartAllCoreThreads();

for (int i=0; i<size; i++)
es.execute(new WriteSpaceWithNotifyReadTask("WriteTask-"+i));
Expand All @@ -220,7 +220,8 @@ public void testStolenEntryAtNotify() throws Throwable {
assertNull(sp2.in("lost-entry", 200), "Detected stolen entry at notify");

es.shutdownNow();
// es.awaitTermination(5, TimeUnit.SECONDS);
if (!es.awaitTermination(5, TimeUnit.SECONDS))
fail("Failed to shutdown ThreadPoolExecutor.");
}

}
2 changes: 1 addition & 1 deletion jpos/src/test/java/org/jpos/util/TPSTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void test1000TPSManualUpdate() throws Exception {
Instant nowDone = Instant.now();
Thread.sleep (1050L - Duration.between(nowInit, Instant.now()).toMillis());
assertTrue(tps.intValue() >= 800, "Expected aprox 1000 TPS but was " + tps.intValue());
assertTrue(tps.intValue() >= 800, "Still expecting aprox 1000 TPS on a second call");
assertTrue(tps.intValue() >= 800, "Still expecting aprox 1000 TPS on a second call but was " + tps.intValue());
Thread.sleep (2500L - Duration.between(nowDone, Instant.now()).toMillis());
assertEquals(
0, tps.intValue(),
Expand Down