diff --git a/pom.xml b/pom.xml
index 5858ff3..e08d66c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1003,7 +1003,6 @@
org.awaitility
awaitility
${awaitility.version}
- test
diff --git a/src/main/java/org/thingsboard/tools/service/GatewayBaseTestExecutor.java b/src/main/java/org/thingsboard/tools/service/GatewayBaseTestExecutor.java
index 0af97d2..fc519c2 100644
--- a/src/main/java/org/thingsboard/tools/service/GatewayBaseTestExecutor.java
+++ b/src/main/java/org/thingsboard/tools/service/GatewayBaseTestExecutor.java
@@ -26,10 +26,10 @@
@ConditionalOnProperty(prefix = "test", value = "api", havingValue = "gateway")
public class GatewayBaseTestExecutor extends BaseTestExecutor {
- @Value("${gateway.createOnStart}")
+ @Value("${gateway.createOnStart:false}")
private boolean gatewayCreateOnStart;
- @Value("${gateway.deleteOnComplete}")
+ @Value("${gateway.deleteOnComplete:false}")
private boolean gatewayDeleteOnComplete;
@Autowired
diff --git a/src/main/java/org/thingsboard/tools/service/customer/DefaultCustomerManager.java b/src/main/java/org/thingsboard/tools/service/customer/DefaultCustomerManager.java
index ec75119..7aa2306 100644
--- a/src/main/java/org/thingsboard/tools/service/customer/DefaultCustomerManager.java
+++ b/src/main/java/org/thingsboard/tools/service/customer/DefaultCustomerManager.java
@@ -45,9 +45,9 @@ public class DefaultCustomerManager implements CustomerManager {
private static final ObjectMapper mapper = new ObjectMapper();
private final List customerIds = Collections.synchronizedList(new ArrayList<>(1024));
- @Value("${customer.startIdx}")
+ @Value("${customer.startIdx:0}")
int customerStartIdx;
- @Value("${customer.endIdx}")
+ @Value("${customer.endIdx:0}")
int customerEndIdx;
@Autowired
diff --git a/src/main/java/org/thingsboard/tools/service/dashboard/DefaultDashboardManager.java b/src/main/java/org/thingsboard/tools/service/dashboard/DefaultDashboardManager.java
index 2156288..c1b0544 100644
--- a/src/main/java/org/thingsboard/tools/service/dashboard/DefaultDashboardManager.java
+++ b/src/main/java/org/thingsboard/tools/service/dashboard/DefaultDashboardManager.java
@@ -42,9 +42,9 @@ public class DefaultDashboardManager implements DashboardManager {
@Autowired
private RestClientService restClientService;
- @Value("${dashboard.tenant:}")
+ @Value("${dashboard.tenant:alarms.json}")
private String[] tenantDashboards;
- @Value("${dashboard.shared:}")
+ @Value("${dashboard.shared:devices.json}")
private String sharedDashboardName;
@Value("${dashboard.deleteIfExists:false}")
private boolean deleteIfExists;
diff --git a/src/main/java/org/thingsboard/tools/service/device/CloudEventAPITest.java b/src/main/java/org/thingsboard/tools/service/device/CloudEventAPITest.java
new file mode 100644
index 0000000..d8d9c87
--- /dev/null
+++ b/src/main/java/org/thingsboard/tools/service/device/CloudEventAPITest.java
@@ -0,0 +1,393 @@
+/**
+ * Copyright © 2016-2022 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.tools.service.device;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.mqtt.MqttQoS;
+import io.netty.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+import org.awaitility.Awaitility;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Service;
+import org.thingsboard.mqtt.MqttClient;
+import org.thingsboard.rest.client.RestClient;
+import org.thingsboard.server.common.data.Device;
+import org.thingsboard.server.common.data.edge.Edge;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EdgeId;
+import org.thingsboard.server.common.data.id.IdBased;
+import org.thingsboard.server.common.data.kv.TsKvEntry;
+import org.thingsboard.tools.service.mqtt.DeviceClient;
+import org.thingsboard.tools.service.msg.Msg;
+import org.thingsboard.tools.service.shared.BaseMqttAPITest;
+import org.thingsboard.tools.service.shared.CloudEventRestClientService;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.thingsboard.tools.service.msg.smartMeter.SmartMeterTelemetryGenerator.CREATE_TIME;
+
+@Slf4j
+@Service
+@ConditionalOnProperty(prefix = "device", value = "api", havingValue = "events_MQTT")
+public class CloudEventAPITest extends BaseMqttAPITest implements DeviceAPITest {
+ private static final AtomicInteger SUCCESS_MESSAGES_SENT_COUNTER = new AtomicInteger();
+ private static final AtomicInteger FAILED_MESSAGE_SENT_COUNTER = new AtomicInteger();
+ private static final AtomicInteger SUCCESS_RENAME_DEVICE_COUNTER = new AtomicInteger();
+ private static final AtomicInteger FAILED_RENAME_DEVICE_COUNTER = new AtomicInteger();
+ private static RestClient sourceClient;
+ private static RestClient targetClient;
+
+ private AtomicInteger waitTSCounter;
+ private CountDownLatch tsDurationLatch;
+ private CountDownLatch changeNameDurationLatch;
+ private Integer countOfAllTSMessage;
+
+ @Value("${rest.target_url}")
+ protected String targetUrl;
+ @Value("${rest.username}")
+ protected String username;
+ @Value("${rest.password}")
+ protected String password;
+ @Value("${device.check_attribute_delay}")
+ protected Integer checkAttributeDelay;
+ @Value("${rest.wait_ts_count}")
+ protected Integer waitTsCount;
+ @Value("${rest.source_type}")
+ protected String sourceType;
+ @Value("${rest.edge_id:}")
+ protected String edgeStringId;
+
+ @Override
+ public void createDevices() throws Exception {
+ initDeviceSuffix(sourceType.equals("CLOUD") ? "_TB" : "_" + edgeStringId);
+ createDevices(true);
+ }
+
+ @Override
+ public void removeDevices() throws Exception {
+ List entityIds = devices.stream().map(IdBased::getId).collect(Collectors.toList());
+ String typeDevice = "devices";
+
+ removeFromCloud(entityIds, typeDevice);
+ }
+
+ private void removeFromCloud(List entityIds, String typeDevice) throws InterruptedException {
+ if (sourceType.equals("CLOUD")) {
+ removeEntities(sourceClient, entityIds, typeDevice);
+ } else {
+ removeEntities(targetClient, entityIds, typeDevice);
+ }
+ }
+
+ @Override
+ public void runApiTests() throws InterruptedException {
+ prepareParameters();
+ prepareClients();
+ startPerformanceTest();
+ waitTSMessage();
+ }
+
+ private void prepareParameters() {
+ waitTSCounter = new AtomicInteger(waitTsCount);
+ tsDurationLatch = new CountDownLatch(testDurationInSec);
+ countOfAllTSMessage = testDurationInSec * testMessagesPerSecond;
+ changeNameDurationLatch = new CountDownLatch(testDurationInSec / checkAttributeDelay);
+ }
+
+ private void prepareClients() {
+ sourceClient = restClientService.getRestClient();
+ targetClient = new RestClient(targetUrl);
+ targetClient.login(username, password);
+
+ if (sourceType.equals("CLOUD")) {
+ assignDevicesToEdge();
+ }
+ }
+
+ private void assignDevicesToEdge() {
+ EdgeId edgeId = new EdgeId(UUID.fromString(edgeStringId));
+ Edge edge = sourceClient.getEdgeById(edgeId).orElseThrow(() -> new RuntimeException("Not found Edge by edge id in TB"));
+
+ for (Device device : devices) {
+ sourceClient.assignDeviceToEdge(edge.getId(), device.getId());
+ }
+ }
+
+ private void startPerformanceTest() throws InterruptedException {
+ log.info("Starting performance test for {} devices...", deviceCount);
+ CloudEventRestClientService cloudEventRestClientService = ((CloudEventRestClientService) restClientService);
+
+ for (int i = 0; i < testDurationInSec; i++) {
+ int iterationNumber = i;
+ restClientService.getScheduler().schedule(() -> runApiTestIteration(iterationNumber), i, TimeUnit.SECONDS);
+ cloudEventRestClientService.getChangeNameScheduler().schedule(() -> sendCloudEventMessage(iterationNumber), i, TimeUnit.SECONDS);
+ }
+
+ log.info("All iterations has been scheduled. Awaiting all iteration completion...");
+ tsDurationLatch.await((long) ((testDurationInSec + checkAttributeDelay) * 1.2), TimeUnit.SECONDS);
+ changeNameDurationLatch.await((long) ((testDurationInSec + checkAttributeDelay) * 1.2), TimeUnit.SECONDS);
+ log.info("Completed performance iteration. Success Sent: {}, Failed Sent: {}, Success Rename: {}, Failed Rename: {}",
+ SUCCESS_MESSAGES_SENT_COUNTER.get(), FAILED_MESSAGE_SENT_COUNTER.get(), SUCCESS_RENAME_DEVICE_COUNTER.get(), FAILED_RENAME_DEVICE_COUNTER.get());
+ }
+
+ protected void runApiTestIteration(int iteration) {
+ try {
+ log.info("[{} seconds] Starting performance iteration for {} {}...", iteration, mqttClients.size(), "devices");
+
+ CountDownLatch iterationLatch = new CountDownLatch(testMessagesPerSecond);
+ AtomicInteger successPublishedCount = new AtomicInteger();
+ AtomicInteger failedPublishedCount = new AtomicInteger();
+
+ sendTsMessage(iteration, successPublishedCount, failedPublishedCount, iterationLatch);
+
+ iterationLatch.await();
+
+ log.info("[{}] Completed performance iteration. Success: {}, Failed: {}", iteration, successPublishedCount.get(), failedPublishedCount.get());
+ tsDurationLatch.countDown();
+ } catch (Throwable t) {
+ log.warn("[{}] Failed to process iteration", iteration, t);
+ }
+ }
+
+ private void sendTsMessage(int iteration, AtomicInteger successPublishedCount, AtomicInteger failedPublishedCount, CountDownLatch iterationLatch) {
+ int deviceCount = deviceClients.size();
+ int msgCount = iteration * testMessagesPerSecond % deviceCount;
+ for (int i = 0; i < testMessagesPerSecond; i++) {
+ int index = msgCount % deviceCount;
+ DeviceClient client = deviceClients.get(index);
+ Msg message = getNextMessage(client.getDeviceName(), false);
+ sendMessageToDevice(iteration, client, message, successPublishedCount, failedPublishedCount, iterationLatch);
+
+ msgCount++;
+ }
+ }
+
+ private void sendMessageToDevice(int iteration, DeviceClient client, Msg message, AtomicInteger successPublishedCount,
+ AtomicInteger failedPublishedCount, CountDownLatch iterationLatch) {
+ Future futurePublish = client.getMqttClient()
+ .publish(getTestTopic(), Unpooled.wrappedBuffer(message.getData()), MqttQoS.AT_MOST_ONCE)
+ .addListener(future -> handleResult(iteration, client, successPublishedCount, failedPublishedCount, iterationLatch, future));
+
+ restClientService.getWorkers().submit(() -> futurePublish);
+ }
+
+ private void handleResult(int iteration, DeviceClient client, AtomicInteger successPublishedCount, AtomicInteger failedPublishedCount,
+ CountDownLatch iterationLatch, Future super Void> future) {
+ if (future.isSuccess()) {
+ SUCCESS_MESSAGES_SENT_COUNTER.incrementAndGet();
+ successPublishedCount.incrementAndGet();
+ logSuccessTestMessage(iteration, client);
+ } else {
+ FAILED_MESSAGE_SENT_COUNTER.incrementAndGet();
+ failedPublishedCount.incrementAndGet();
+ logFailureTestMessage(iteration, client, future);
+ }
+
+ iterationLatch.countDown();
+ }
+
+ private void sendCloudEventMessage(int iteration) {
+ if (iteration % checkAttributeDelay == 0) {
+ try {
+ Device deviceWithNewName = changeDeviceName(iteration);
+ checkDeviceName(deviceWithNewName);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private Device changeDeviceName(int iteration) throws InterruptedException {
+ Device device = devices.get(iteration);
+ log.info("[{}] Starting change Device name - {}", iteration, device.getName());
+
+ device.setName(device.getName() + "_" + iteration);
+ sourceClient.saveDevice(device);
+
+ log.info("[{}] Completed change Device name - {}", iteration, device.getName());
+
+ return device;
+ }
+
+ private void checkDeviceName(Device deviceWithNewName) {
+ AtomicInteger checkAttributeCount = new AtomicInteger(checkAttributeDelay);
+ Awaitility.await()
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(checkAttributeDelay, TimeUnit.SECONDS)
+ .until(() -> {
+ checkAttributeCount.decrementAndGet();
+ boolean checkSuccessful = false;
+ Optional cloudDevice = targetClient.getDeviceById(deviceWithNewName.getId());
+
+ if (cloudDevice.isPresent() && cloudDevice.get().getName().equals(deviceWithNewName.getName())) {
+ log.info("Success check Device Name - {}", cloudDevice.get().getName());
+ SUCCESS_RENAME_DEVICE_COUNTER.incrementAndGet();
+ changeNameDurationLatch.countDown();
+ checkSuccessful = true;
+ }
+ if (checkAttributeCount.get() == 0) {
+ log.info("Failed check Device Name - {}", deviceWithNewName.getName());
+ FAILED_RENAME_DEVICE_COUNTER.incrementAndGet();
+ changeNameDurationLatch.countDown();
+ checkSuccessful = false;
+ }
+ return checkSuccessful;
+ });
+ }
+
+ private void waitTSMessage() throws InterruptedException {
+ AtomicInteger findTsMessage = new AtomicInteger();
+ String key = "batteryLevel";
+ long endTime = CREATE_TIME + (long) countOfAllTSMessage;
+ long tsMessageByDevice = countOfAllTSMessage / devices.size();
+ long leftover = countOfAllTSMessage % devices.size();
+
+ for (Device device : devices) {
+ List deviceTs = targetClient.getTimeseries(device.getId(), Collections.singletonList(key), 1000L,
+ null, null, CREATE_TIME, endTime, Integer.MAX_VALUE, true);
+
+ if (deviceTs.size() == tsMessageByDevice) {
+ findTsMessage.addAndGet(deviceTs.size());
+ } else if (deviceTs.size() == tsMessageByDevice + 1 && leftover > 0) {
+ leftover--;
+ findTsMessage.addAndGet(deviceTs.size());
+ } else {
+ log.info("The TS check was missed because not everyone has arrived yet.");
+ break;
+ }
+ }
+
+ waitAndRetry(findTsMessage);
+ }
+
+ private void waitAndRetry(AtomicInteger findTsMessage) throws InterruptedException {
+ if (countOfAllTSMessage != findTsMessage.get() && waitTSCounter.get() != 1) {
+ log.info("Need wait before try check TS again - " + 10 + " seconds");
+
+ TimeUnit.SECONDS.sleep(10);
+ waitTSCounter.decrementAndGet();
+
+ waitTSMessage();
+ } else {
+ log.info("Sent TS: {}, Find TS: {}", countOfAllTSMessage, findTsMessage.get());
+ }
+ }
+
+ @Override
+ protected String getWarmUpTopic() {
+ return "v1/devices/me/telemetry";
+ }
+
+ @Override
+ protected String getTestTopic() {
+ return telemetryTest ? "v1/devices/me/telemetry" : "v1/devices/me/attributes";
+ }
+
+ @Override
+ protected void logSuccessTestMessage(int iteration, DeviceClient client) {
+ log.debug("[{}] Message was successfully published to device: {}", iteration, client.getDeviceName());
+ }
+
+ @Override
+ protected void logFailureTestMessage(int iteration, DeviceClient client, Future> future) {
+ log.error("[{}] Error while publishing message to device: [{}] {}", iteration, client.getDeviceName(), future.cause().getMessage());
+ }
+
+ @Override
+ public void connectDevices() throws InterruptedException {
+ AtomicInteger totalConnectedCount = new AtomicInteger();
+ List devicesNames = prepareDevicesNames();
+
+ connectDevicesBySequence(devicesNames, totalConnectedCount);
+ mapDevicesToDeviceClientConnections();
+
+ TimeUnit.SECONDS.sleep(5);
+ }
+
+ private List prepareDevicesNames() {
+ if (!devices.isEmpty()) {
+ return devices.stream().map(Device::getName).collect(Collectors.toList());
+ } else {
+ List devicesNames = new ArrayList<>();
+
+ for (int i = deviceStartIdx; i < deviceEndIdx; i++) {
+ devicesNames.add(getToken(false, i));
+ }
+
+ return devicesNames;
+ }
+ }
+
+ private void connectDevicesBySequence(List devicesNames, AtomicInteger totalConnectedCount) throws InterruptedException {
+ List pack = null;
+
+ for (String device : devicesNames) {
+ if (pack == null) {
+ pack = new ArrayList<>(warmUpPackSize);
+ }
+
+ pack.add(device);
+
+ if (pack.size() == warmUpPackSize) {
+ connectDevices(pack, totalConnectedCount, false);
+ TimeUnit.SECONDS.sleep(1 + random.nextInt(10));
+ pack = null;
+ }
+ }
+
+ if (pack != null) {
+ connectDevices(pack, totalConnectedCount, false);
+ }
+ }
+
+ private void mapDevicesToDeviceClientConnections() {
+ mqttClients.forEach(this::fillDeviceClients);
+
+ log.info("Sorting device clients...");
+ deviceClients.sort(Comparator.comparing(DeviceClient::getDeviceName));
+ log.info("Shuffling device clients...");
+ Collections.shuffle(deviceClients, random);
+ log.info("Mapping devices to device client connections done");
+ }
+
+ private void fillDeviceClients(MqttClient mqttClient) {
+ DeviceClient client = new DeviceClient();
+ client.setMqttClient(mqttClient);
+ client.setDeviceName(mqttClient.getClientConfig().getUsername());
+ deviceClients.add(client);
+ }
+
+ @Override
+ public void generationX509() {
+
+ }
+
+ @Override
+ protected void runApiTestIteration(int iteration, AtomicInteger totalSuccessPublishedCount, AtomicInteger totalFailedPublishedCount, CountDownLatch testDurationLatch) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/thingsboard/tools/service/device/HttpDeviceAPITest.java b/src/main/java/org/thingsboard/tools/service/device/HttpDeviceAPITest.java
index b4b0006..9d9356b 100644
--- a/src/main/java/org/thingsboard/tools/service/device/HttpDeviceAPITest.java
+++ b/src/main/java/org/thingsboard/tools/service/device/HttpDeviceAPITest.java
@@ -76,7 +76,7 @@ public void createDevices() throws Exception {
@Override
public void removeDevices() throws Exception {
- removeEntities(devices.stream().map(IdBased::getId).collect(Collectors.toList()), "devices");
+ removeEntities(restClientService.getRestClient(), devices.stream().map(IdBased::getId).collect(Collectors.toList()), "devices");
}
@Override
diff --git a/src/main/java/org/thingsboard/tools/service/device/Lwm2mDeviceAPITest.java b/src/main/java/org/thingsboard/tools/service/device/Lwm2mDeviceAPITest.java
index 03e8ab3..18460dd 100644
--- a/src/main/java/org/thingsboard/tools/service/device/Lwm2mDeviceAPITest.java
+++ b/src/main/java/org/thingsboard/tools/service/device/Lwm2mDeviceAPITest.java
@@ -81,7 +81,7 @@ public void destroy() {
@Override
public void removeDevices() throws Exception {
- removeEntities(devices.stream().map(IdBased::getId).collect(Collectors.toList()), "lwm2m");
+ removeEntities(restClientService.getRestClient(), devices.stream().map(IdBased::getId).collect(Collectors.toList()), "lwm2m");
}
@Override
diff --git a/src/main/java/org/thingsboard/tools/service/device/MqttDeviceAPITest.java b/src/main/java/org/thingsboard/tools/service/device/MqttDeviceAPITest.java
index dab1073..555bdcb 100644
--- a/src/main/java/org/thingsboard/tools/service/device/MqttDeviceAPITest.java
+++ b/src/main/java/org/thingsboard/tools/service/device/MqttDeviceAPITest.java
@@ -40,9 +40,6 @@
@ConditionalOnProperty(prefix = "device", value = "api", havingValue = "MQTT")
public class MqttDeviceAPITest extends BaseMqttAPITest implements DeviceAPITest {
- static String dataAsStr = "{\"t1\":73}";
- static byte[] data = dataAsStr.getBytes(StandardCharsets.UTF_8);
-
@Override
public void createDevices() throws Exception {
createDevices(true);
@@ -50,7 +47,7 @@ public void createDevices() throws Exception {
@Override
public void removeDevices() throws Exception {
- removeEntities(devices.stream().map(IdBased::getId).collect(Collectors.toList()), "devices");
+ removeEntities(restClientService.getRestClient(), devices.stream().map(IdBased::getId).collect(Collectors.toList()), "devices");
}
@Override
@@ -63,11 +60,6 @@ protected String getWarmUpTopic() {
return "v1/devices/me/telemetry";
}
- @Override
- protected byte[] getData(String deviceName) {
- return data;
- }
-
@Override
protected void runApiTestIteration(int iteration, AtomicInteger totalSuccessPublishedCount, AtomicInteger totalFailedPublishedCount, CountDownLatch testDurationLatch) {
runApiTestIteration(iteration, totalSuccessPublishedCount, totalFailedPublishedCount, testDurationLatch, false);
diff --git a/src/main/java/org/thingsboard/tools/service/gateway/MqttGatewayAPITest.java b/src/main/java/org/thingsboard/tools/service/gateway/MqttGatewayAPITest.java
index c8207b0..9551575 100644
--- a/src/main/java/org/thingsboard/tools/service/gateway/MqttGatewayAPITest.java
+++ b/src/main/java/org/thingsboard/tools/service/gateway/MqttGatewayAPITest.java
@@ -161,11 +161,11 @@ protected void logFailureTestMessage(int iteration, DeviceClient client, Future<
@Override
public void removeGateways() throws Exception {
- removeEntities(gateways.stream().map(Device::getId).collect(Collectors.toList()), "gateways");
+ removeEntities(restClientService.getRestClient(), gateways.stream().map(Device::getId).collect(Collectors.toList()), "gateways");
}
@Override
public void removeDevices() throws Exception {
- removeEntities(devices.stream().map(IdBased::getId).collect(Collectors.toList()), "devices");
+ removeEntities(restClientService.getRestClient(), devices.stream().map(IdBased::getId).collect(Collectors.toList()), "devices");
}
}
diff --git a/src/main/java/org/thingsboard/tools/service/msg/smartMeter/SmartMeterTelemetryGenerator.java b/src/main/java/org/thingsboard/tools/service/msg/smartMeter/SmartMeterTelemetryGenerator.java
index b4ebe5a..8834f8e 100644
--- a/src/main/java/org/thingsboard/tools/service/msg/smartMeter/SmartMeterTelemetryGenerator.java
+++ b/src/main/java/org/thingsboard/tools/service/msg/smartMeter/SmartMeterTelemetryGenerator.java
@@ -24,13 +24,15 @@
import org.thingsboard.tools.service.msg.MessageGenerator;
import org.thingsboard.tools.service.msg.Msg;
+import java.util.concurrent.atomic.AtomicInteger;
+
@Slf4j
@Service(value = "randomTelemetryGenerator")
@ConditionalOnProperty(prefix = "test", value = "payloadType", havingValue = "SMART_METER")
public class SmartMeterTelemetryGenerator extends BaseMessageGenerator implements MessageGenerator {
-
static final int BATTERY_LEVEL_ALARM = 10;
-
+ public static long CREATE_TIME = System.currentTimeMillis();
+ static AtomicInteger COUNTER = new AtomicInteger(0);
@Override
public Msg getNextMessage(String deviceName, boolean shouldTriggerAlarm) {
byte[] payload;
@@ -43,12 +45,13 @@ public Msg getNextMessage(String deviceName, boolean shouldTriggerAlarm) {
} else {
tsNode = data;
}
- tsNode.put("ts", System.currentTimeMillis());
+ tsNode.put("ts", CREATE_TIME + COUNTER.get());
ObjectNode values = tsNode.putObject("values");
values.put("pulseCounter", random.nextInt(1000000));
values.put("leakage", random.nextInt(100) > 1); // leakage true in 1% cases
values.put("batteryLevel", shouldTriggerAlarm ? BATTERY_LEVEL_ALARM : random.nextInt(50) + 50);
payload = mapper.writeValueAsBytes(data);
+ COUNTER.incrementAndGet();
} catch (Exception e) {
log.warn("Failed to generate message", e);
throw new RuntimeException(e);
diff --git a/src/main/java/org/thingsboard/tools/service/shared/AbstractAPITest.java b/src/main/java/org/thingsboard/tools/service/shared/AbstractAPITest.java
index e088d3c..b70150b 100644
--- a/src/main/java/org/thingsboard/tools/service/shared/AbstractAPITest.java
+++ b/src/main/java/org/thingsboard/tools/service/shared/AbstractAPITest.java
@@ -23,6 +23,7 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.HttpServerErrorException;
+import org.thingsboard.rest.client.RestClient;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
@@ -51,7 +52,6 @@
@Slf4j
public abstract class AbstractAPITest {
-
protected static ObjectMapper mapper = new ObjectMapper();
protected ScheduledFuture> reportScheduledFuture;
@@ -75,7 +75,7 @@ public abstract class AbstractAPITest {
@Value("${test.instanceIdxRegex:([0-9]+)$}")
protected String instanceIdxRegex;
- @Value("${test.sequential:true}")
+ @Value("${test.sequential:false}")
protected boolean sequentialTest;
@Value("${test.telemetry:true}")
protected boolean telemetryTest;
@@ -120,6 +120,8 @@ public abstract class AbstractAPITest {
protected int deviceEndIdx;
protected int instanceIdx;
+ private String suffix = "";
+
@PostConstruct
protected void init() {
random = new Random(seed);
@@ -159,6 +161,10 @@ protected void destroy() {
}
}
+ protected void initDeviceSuffix(String suffix){
+ this.suffix = suffix;
+ }
+
protected void createDevices(boolean setCredentials) throws Exception {
List entities = createEntities(deviceStartIdx, deviceEndIdx, false, setCredentials);
devices = Collections.synchronizedList(entities);
@@ -184,14 +190,14 @@ protected void runApiTests(int deviceCount) throws InterruptedException {
protected abstract void runApiTestIteration(int iteration, AtomicInteger totalSuccessPublishedCount, AtomicInteger totalFailedPublishedCount, CountDownLatch testDurationLatch);
- protected void removeEntities(List entityIds, String typeDevice) throws InterruptedException {
+ protected void removeEntities(RestClient restClient, List entityIds, String typeDevice) throws InterruptedException {
log.info("Removing [{}] [{}]...", typeDevice, entityIds.size());
CountDownLatch latch = new CountDownLatch(entityIds.size());
AtomicInteger count = new AtomicInteger();
for (DeviceId entityId : entityIds) {
restClientService.getHttpExecutor().submit(() -> {
try {
- restClientService.getRestClient().deleteDevice(entityId);
+ restClient.deleteDevice(entityId);
count.getAndIncrement();
} catch (Exception e) {
log.error("Error while deleting [{}]", typeDevice, getHttpErrorException(e));
@@ -281,7 +287,7 @@ protected List createEntities(int startIdx, int endIdx, boolean isGatewa
}
protected String getToken(boolean isGateway, int token) {
- return (isGateway ? "GW" : "DW") + String.format("%8d", token).replace(" ", "0");
+ return (isGateway ? "GW" : "DW") + String.format("%8d", token).replace(" ", "0") + suffix;
}
protected Msg getNextMessage(String deviceName, boolean alarmRequired) {
diff --git a/src/main/java/org/thingsboard/tools/service/shared/BaseMqttAPITest.java b/src/main/java/org/thingsboard/tools/service/shared/BaseMqttAPITest.java
index 15f57a6..1dcc4ea 100644
--- a/src/main/java/org/thingsboard/tools/service/shared/BaseMqttAPITest.java
+++ b/src/main/java/org/thingsboard/tools/service/shared/BaseMqttAPITest.java
@@ -47,7 +47,8 @@
@Slf4j
public abstract class BaseMqttAPITest extends AbstractAPITest {
-
+ private static final String DATA_AS_STR = "{\"t1\":73}";
+ private static final byte[] DEFAULT_DATA = DATA_AS_STR.getBytes(StandardCharsets.UTF_8);
private static final int CONNECT_TIMEOUT = 5;
private EventLoopGroup EVENT_LOOP_GROUP;
@@ -55,11 +56,11 @@ public abstract class BaseMqttAPITest extends AbstractAPITest {
private String mqttHost;
@Value("${mqtt.port}")
private int mqttPort;
- @Value("${mqtt.ssl.enabled}")
+ @Value("${mqtt.ssl.enabled:false}")
boolean mqttSslEnabled;
- @Value("${mqtt.ssl.key_store}")
+ @Value("${mqtt.ssl.key_store:}")
String mqttSslKeyStore;
- @Value("${mqtt.ssl.key_store_password}")
+ @Value("${mqtt.ssl.key_store_password:}")
String mqttSslKeyStorePassword;
protected final List mqttClients = Collections.synchronizedList(new ArrayList<>(1024 * 16));
@@ -131,7 +132,9 @@ private void sendAndWaitPack(List pack, AtomicInteger totalWarmedU
protected abstract String getWarmUpTopic();
- protected abstract byte[] getData(String deviceName);
+ protected byte[] getData(String deviceName){
+ return DEFAULT_DATA;
+ }
protected DeviceClient getDeviceClient(Set iterationDevices, int iteration, int msgOffsetIdx) {
DeviceClient client;
diff --git a/src/main/java/org/thingsboard/tools/service/shared/BaseTestExecutor.java b/src/main/java/org/thingsboard/tools/service/shared/BaseTestExecutor.java
index 5d3089b..8ea56de 100644
--- a/src/main/java/org/thingsboard/tools/service/shared/BaseTestExecutor.java
+++ b/src/main/java/org/thingsboard/tools/service/shared/BaseTestExecutor.java
@@ -27,16 +27,16 @@
@Slf4j
public abstract class BaseTestExecutor implements TestExecutor {
- @Value("${dashboard.createOnStart}")
+ @Value("${dashboard.createOnStart:false}")
private boolean dashboardCreateOnStart;
- @Value("${dashboard.deleteOnComplete}")
+ @Value("${dashboard.deleteOnComplete:false}")
private boolean dashboardDeleteOnComplete;
- @Value("${customer.createOnStart}")
+ @Value("${customer.createOnStart:false}")
private boolean customerCreateOnStart;
- @Value("${customer.deleteOnComplete}")
+ @Value("${customer.deleteOnComplete:false}")
private boolean customerDeleteOnComplete;
@Value("${device.createOnStart}")
@@ -51,10 +51,10 @@ public abstract class BaseTestExecutor implements TestExecutor {
@Value("${test.enabled:true}")
protected boolean testEnabled;
- @Value("${test.updateRootRuleChain:true}")
+ @Value("${test.updateRootRuleChain:false}")
protected boolean updateRootRuleChain;
- @Value("${test.revertRootRuleChain:true}")
+ @Value("${test.revertRootRuleChain:false}")
protected boolean revertRootRuleChain;
@Autowired
diff --git a/src/main/java/org/thingsboard/tools/service/shared/CloudEventRestClientService.java b/src/main/java/org/thingsboard/tools/service/shared/CloudEventRestClientService.java
new file mode 100644
index 0000000..93c99fa
--- /dev/null
+++ b/src/main/java/org/thingsboard/tools/service/shared/CloudEventRestClientService.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright © 2016-2022 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.tools.service.shared;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
+import org.springframework.stereotype.Service;
+import org.thingsboard.common.util.ThingsBoardThreadFactory;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+@Getter @Slf4j
+@Service
+@ConditionalOnExpression("'${device.api:null}'=='events_MQTT'")
+public class CloudEventRestClientService extends DefaultRestClientService implements RestClientService {
+ protected final ScheduledExecutorService changeNameScheduler = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("scheduler"));
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ if (!this.changeNameScheduler.isShutdown()) {
+ this.changeNameScheduler.shutdownNow();
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/org/thingsboard/tools/service/shared/DefaultRestClientService.java b/src/main/java/org/thingsboard/tools/service/shared/DefaultRestClientService.java
index 54c53d7..5cecbc6 100644
--- a/src/main/java/org/thingsboard/tools/service/shared/DefaultRestClientService.java
+++ b/src/main/java/org/thingsboard/tools/service/shared/DefaultRestClientService.java
@@ -22,6 +22,7 @@
import org.eclipse.californium.elements.util.ExecutorsUtil;
import org.eclipse.californium.elements.util.NamedThreadFactory;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
@@ -45,6 +46,7 @@
@Slf4j
@Service
+@ConditionalOnExpression("'${device.api:null}'!='events_MQTT'")
public class DefaultRestClientService implements RestClientService {
public static final int LOG_PAUSE = 1;
@@ -66,7 +68,7 @@ public class DefaultRestClientService implements RestClientService {
private String username;
@Value("${rest.password}")
private String password;
- @Value("${rest.connect_server}")
+ @Value("${rest.connect_server:true}")
private boolean connectServer;
static {
@@ -110,9 +112,9 @@ public boolean verify(String hostname, SSLSession session) {
}
@Getter
- private RestClient restClient;
+ protected RestClient restClient;
@Getter
- private EventLoopGroup eventLoopGroup;
+ protected EventLoopGroup eventLoopGroup;
@Override
public ExecutorService getWorkers() {
diff --git a/src/main/resources/events-performance-tests.yml b/src/main/resources/events-performance-tests.yml
new file mode 100644
index 0000000..c6dd4d7
--- /dev/null
+++ b/src/main/resources/events-performance-tests.yml
@@ -0,0 +1,54 @@
+#
+# Copyright © 2016-2022 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+spring.main.web-application-type: NONE
+
+rest:
+ # true - when need to assign device to edge.
+ source_type: "${SOURCE_TYPE:EDGE}"
+ url: "${SOURCE_URL:http://localhost:18080}"
+ target_url: "${TARGET_URL:http://localhost:8080}"
+ edge_id: "${EDGE_ID:f0d0d470-b098-11ef-bf09-3fc3f309b24e}"
+ username: "${REST_USERNAME:tenant@thingsboard.org}"
+ password: "${REST_PASSWORD:tenant}"
+ pool_size: "${REST_POOL_SIZE:4}"
+ wait_ts_count: "${WAIT_TS_COUNT:10}"
+mqtt:
+ host: "${MQTT_HOST:localhost}"
+ port: "${MQTT_PORT:1883}"
+device:
+ # Device API to use - events_MQTT
+ api: "${DEVICE_API:events_MQTT}"
+ startIdx: "${DEVICE_START_IDX:0}"
+ endIdx: "${DEVICE_END_IDX:100}"
+ count: "${DEVICE_COUNT:100}" # count of devices to be used by clients in k8s deployment run
+ createOnStart: "${DEVICE_CREATE_ON_START:true}"
+ deleteOnComplete: "${DEVICE_DELETE_ON_COMPLETE:false}"
+ check_attribute_delay: "${CHECK_ATTRIBUTE_DELAY:10}"
+warmup:
+ enabled: "${WARMUP_ENABLED:true}"
+ packSize: "${WARMUP_PACK_SIZE:100}"
+test:
+ # Type of the payload to send: DEFAULT, SMART_TRACKER, SMART_METER
+ # RANDOM - TODO: add description
+ # SMART_TRACKER - sample payload: {"latitude": 42.222222, "longitude": 73.333333, "speed": 55.5, "fuel": 92, "batteryLevel": 81}
+ # SMART_METER - sample payload: {"pulseCounter": 1234567, "leakage": false, "batteryLevel": 81}
+ payloadType: "${TEST_PAYLOAD_TYPE:SMART_METER}" # device profile name
+ # Test API to use - device, gateway or lwm2m
+ # If Device API == LWM2M only - lwm2m
+ api: "${TEST_API:device}"
+ mps: "${MESSAGES_PER_SECOND:1000}"
+ duration: "${DURATION_IN_SECONDS:300}"
\ No newline at end of file
diff --git a/start-events-performance-cloud.sh b/start-events-performance-cloud.sh
new file mode 100755
index 0000000..27d5f51
--- /dev/null
+++ b/start-events-performance-cloud.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+#
+# Copyright © 2016-2022 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+echo "Starting ThingsBoard Performance Test..."
+
+#CLOUD, EDGE
+export SOURCE_TYPE=CLOUD
+export SOURCE_URL=http://127.0.0.1:8080
+export MQTT_HOST=127.0.0.1
+export MQTT_PORT=1883
+export TARGET_URL=http://127.0.0.1:18080
+export EDGE_ID=f0d0d470-b098-11ef-bf09-3fc3f309b24e
+
+export DEVICE_START_IDX=0
+export DEVICE_END_IDX=100
+export DEVICE_COUNT=100
+export DEVICE_CREATE_ON_START=true
+export DEVICE_DELETE_ON_COMPLETE=true
+
+export MESSAGES_PER_SECOND=1000
+export DURATION_IN_SECONDS=60
+export CHECK_ATTRIBUTE_DELAY=10
+export WAIT_TS_COUNT=10
+
+mvn spring-boot:run -Dspring-boot.run.arguments="--spring.config.name=events-performance-tests"
diff --git a/start-events-performance-edge.sh b/start-events-performance-edge.sh
new file mode 100755
index 0000000..9f2c15c
--- /dev/null
+++ b/start-events-performance-edge.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+#
+# Copyright © 2016-2022 The Thingsboard Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+echo "Starting ThingsBoard Performance Test..."
+
+#CLOUD, EDGE
+export SOURCE_TYPE=EDGE
+export SOURCE_URL=http://127.0.0.1:18080
+export MQTT_HOST=127.0.0.1
+export MQTT_PORT=11883
+export TARGET_URL=http://127.0.0.1:8080
+
+export DEVICE_START_IDX=0
+export DEVICE_END_IDX=100
+export DEVICE_COUNT=100
+export DEVICE_CREATE_ON_START=true
+export DEVICE_DELETE_ON_COMPLETE=true
+
+export MESSAGES_PER_SECOND=1000
+export DURATION_IN_SECONDS=60
+export CHECK_ATTRIBUTE_DELAY=10
+export WAIT_TS_COUNT=100
+
+mvn spring-boot:run -Dspring-boot.run.arguments="--spring.config.name=events-performance-tests"