From 586cc954f09dadf8605f2502d1c3ab16d05a168e Mon Sep 17 00:00:00 2001 From: Uarbekov Date: Tue, 18 Jul 2023 15:38:16 +0600 Subject: [PATCH 1/5] feat(uat): add t20 scenario with steps for large message --- .../client/control/api/addon/EventFilter.java | 15 +++ .../addon/MqttMessageEvent.java | 13 +- .../greengrass/steps/MqttControlSteps.java | 72 ++++++++++- .../greengrass/features/ggmq-1.feature | 113 ++++++++++++++++++ 4 files changed, 205 insertions(+), 8 deletions(-) diff --git a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/addon/EventFilter.java b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/addon/EventFilter.java index af1361930..90b236a17 100644 --- a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/addon/EventFilter.java +++ b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/addon/EventFilter.java @@ -35,6 +35,7 @@ public final class EventFilter { private final Integer messageExpiryInterval; private final String responseTopic; private final byte[] correlationData; + private final Boolean isMessageFull; EventFilter(Builder builder) { super(); @@ -55,6 +56,7 @@ public final class EventFilter { this.messageExpiryInterval = builder.messageExpiryInterval; this.responseTopic = builder.responseTopic; this.correlationData = builder.correlationData; + this.isMessageFull = builder.isMessageFull; } /** @@ -78,6 +80,7 @@ public static class Builder { private Integer messageExpiryInterval; private String responseTopic; private byte[] correlationData; + private Boolean isMessageFull; /** * Sets type of event. @@ -298,6 +301,18 @@ public Builder withCorrelationData(byte[] correlationData) { return this; } + + /** + * Sets indicator of message is full or not. + * Applicable only for MQTT message events + * + * @param isMessageFull is message full or not + */ + public Builder withIsMessageFull(Boolean isMessageFull) { + this.isMessageFull = isMessageFull; + return this; + } + public EventFilter build() { return new EventFilter(this); } diff --git a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/addon/MqttMessageEvent.java b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/addon/MqttMessageEvent.java index c378e7225..565e8f682 100644 --- a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/addon/MqttMessageEvent.java +++ b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/addon/MqttMessageEvent.java @@ -13,6 +13,7 @@ import lombok.Getter; import lombok.NonNull; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; @@ -111,7 +112,7 @@ public boolean isMatched(@NonNull EventFilter filter) { // TODO: check QoS ? it can be differ on transmit and receive sides // check content - return comparePayload(filter.getContent()); + return comparePayload(filter.getContent(), filter.getIsMessageFull()); } @SuppressWarnings("PMD.CompareObjectsWithEquals") @@ -134,7 +135,7 @@ private boolean compareConnection(ConnectionControl expectedConnectionControl, S return connectionName == null || connectionName.equals(getConnectionName()); } - private boolean comparePayload(byte[] expected) { + private boolean comparePayload(byte[] expected, Boolean isMessageFull) { if (expected == null) { return true; } @@ -144,7 +145,13 @@ private boolean comparePayload(byte[] expected) { return false; } - return Arrays.equals(expected, byteStringPayload.toByteArray()); + if (isMessageFull == null || isMessageFull) { + return Arrays.equals(expected, byteStringPayload.toByteArray()); + } else { + String expectedBeginningMessage = new String(expected, StandardCharsets.UTF_8); + String actualMessage = byteStringPayload.toString(StandardCharsets.UTF_8); + return actualMessage.contains(expectedBeginningMessage); + } } private boolean isRetainMatched(Boolean retain) { diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java b/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java index 9230d57d6..6c32f223b 100644 --- a/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java +++ b/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java @@ -58,12 +58,14 @@ import software.amazon.awssdk.utils.CollectionUtils; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -917,13 +919,50 @@ private void subscribe(@NonNull String clientDeviceId, @NonNull String topicFilt } } + /** + * Publish the large MQTT message. + * + * @param clientDeviceId user defined client device id + * @param topicString the topic to publish message + * @param qos the value of MQTT QoS for publishing + * @param messageBeginning the content of message with beginning to publish + * @param kbSize the size of message + * @throws StatusRuntimeException on gRPC errors + * @throws IllegalArgumentException on invalid QoS argument + */ + @When("I publish from {string} to {string} with qos {int} and large message with beginning of {string} " + + "with size {int} KB") + public void publishLargeMessage(String clientDeviceId, String topicString, int qos, + String messageBeginning, int kbSize) { + int lengthInBytes = kbSize * 1028; + // 1 char is 2 bytes + int fullMessageSize = lengthInBytes / 2; + + if (messageBeginning.length() > fullMessageSize) { + throw new RuntimeException("messageBeginning is larger than expected size"); + } + + byte[] randomMessageBytes = new byte[fullMessageSize - messageBeginning.length()]; + + new Random().nextBytes(randomMessageBytes); + String randomMessage = new String(randomMessageBytes, StandardCharsets.UTF_8); + + while (randomMessage.contains(messageBeginning)) { + new Random().nextBytes(randomMessageBytes); + randomMessage = new String(randomMessageBytes, StandardCharsets.UTF_8); + } + + String message = messageBeginning + randomMessage; + publish(clientDeviceId, topicString, qos, message, PublishReasonCode.SUCCESS.getValue()); + } + /** * Publish the MQTT message. * * @param clientDeviceId user defined client device id * @param topicString the topic to publish message * @param qos the value of MQTT QoS for publishing - * @param message the the content of message to publish + * @param message the content of message to publish * @throws StatusRuntimeException on gRPC errors * @throws IllegalArgumentException on invalid QoS argument */ @@ -938,7 +977,7 @@ public void publish(String clientDeviceId, String topicString, int qos, String m * @param clientDeviceId user defined client device id * @param topicString the topic to publish message * @param qos the value of MQTT QoS for publishing - * @param message the the content of message to publish + * @param message the content of message to publish * @param expectedStatus the status of MQTT QoS for publish reply * @throws StatusRuntimeException on gRPC errors * @throws IllegalArgumentException on invalid QoS argument @@ -987,7 +1026,7 @@ public void publish(String clientDeviceId, String topicString, int qos, String m @And("message {string} is not received on {string} from {string} topic within {int} {word}") public void notReceivedMessage(String message, String clientDeviceId, String topicString, int value, String unit) throws TimeoutException, InterruptedException { - receive(message, clientDeviceId, topicString, value, unit, false); + receive(message, clientDeviceId, topicString, value, unit, false, false); } /** @@ -1006,7 +1045,28 @@ public void notReceivedMessage(String message, String clientDeviceId, String top @And("message {string} received on {string} from {string} topic within {int} {word}") public void receivedMessage(String message, String clientDeviceId, String topicString, int value, String unit) throws TimeoutException, InterruptedException { - receive(message, clientDeviceId, topicString, value, unit, true); + receive(message, clientDeviceId, topicString, value, unit, true, true); + } + + + /** + * Verify is MQTT message is received in limited duration of time. + * + * @param message content of message to receive + * @param clientDeviceId the user defined client device id + * @param topicString the topic (not a filter) which message has been sent + * @param value the duration of time to wait for message + * @param unit the time unit to wait + * @throws TimeoutException when matched message was not received in specified duration of time + * @throws RuntimeException on internal errors + * @throws InterruptedException then thread has been interrupted + */ + @SuppressWarnings("PMD.UseObjectForClearerAPI") + @And("message beginning with {string} received on {string} from {string} topic within {int} {word}") + public void receivedMessageBeginning(String message, String clientDeviceId, String topicString, int value, + String unit) + throws TimeoutException, InterruptedException { + receive(message, clientDeviceId, topicString, value, unit, true, false); } /** @@ -1018,13 +1078,14 @@ public void receivedMessage(String message, String clientDeviceId, String topicS * @param value the duration of time to wait for message * @param unit the time unit to wait * @param isExpectedMessage used for setting message expectation + * @param isMessageFull used for setting full message expectation * @throws TimeoutException when matched message was not received in specified duration of time * @throws RuntimeException on internal errors * @throws InterruptedException then thread has been interrupted */ @SuppressWarnings("PMD.UseObjectForClearerAPI") public void receive(String message, String clientDeviceId, String topicString, int value, - String unit, boolean isExpectedMessage) + String unit, boolean isExpectedMessage, boolean isMessageFull) throws TimeoutException, InterruptedException { // getting connectionControl by clientDeviceId final String clientDeviceThingName = getClientDeviceThingName(clientDeviceId); @@ -1045,6 +1106,7 @@ public void receive(String message, String clientDeviceId, String topicString, i .withMessageExpiryInterval(rxMessageExpiryInterval) .withResponseTopic(rxResponseTopic) .withCorrelationData(rxCorrelationData) + .withIsMessageFull(isMessageFull) .build(); // convert time units TimeUnit timeUnit = TimeUnit.valueOf(unit.toUpperCase()); diff --git a/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature b/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature index 67855e5ce..373cfcdde 100644 --- a/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature +++ b/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature @@ -1227,6 +1227,119 @@ Feature: GGMQ-1 | mqtt-v | name | agent | recipe | | v5 | paho-python | aws.greengrass.client.Mqtt5PythonPahoClient | client_python_paho.yaml | + @GGMQ-1-T22 + Scenario Outline: GGMQ-1-T22--: As a customer, I can send a message of size 128KiB to the MQTT broker + When I create a Greengrass deployment with components + | aws.greengrass.clientdevices.Auth | LATEST | + | aws.greengrass.clientdevices.mqtt.EMQX | LATEST | + | aws.greengrass.clientdevices.IPDetector | LATEST | + | aws.greengrass.clientdevices.mqtt.Bridge | LATEST | + | | classpath:/local-store/recipes/ | + And I create client device "large_payload_publisher" + When I associate "large_payload_publisher" with ggc + And I update my Greengrass deployment configuration, setting the component aws.greengrass.clientdevices.Auth configuration to: + """ +{ + "MERGE":{ + "deviceGroups":{ + "formatVersion":"2021-03-05", + "definitions":{ + "MyPermissiveDeviceGroup":{ + "selectionRule":"thingName: ${large_payload_publisher}", + "policyName":"MyPermissivePolicy" + } + }, + "policies":{ + "MyPermissivePolicy":{ + "AllowAll":{ + "statementDescription":"Allow client devices to perform all actions.", + "operations":[ + "*" + ], + "resources":[ + "*" + ] + } + } + } + } + } +} + """ + And I update my Greengrass deployment configuration, setting the component configuration to: + """ +{ + "MERGE":{ + "controlAddresses":"${mqttControlAddresses}", + "controlPort":"${mqttControlPort}" + } +} + """ + And I update my Greengrass deployment configuration, setting the component aws.greengrass.clientdevices.mqtt.Bridge configuration to: + """ +{ + "MERGE":{ + "mqttTopicMapping":{ + "mapping1:":{ + "topic":"${large_payload_publisher}topic/to/iotcore", + "source":"LocalMqtt", + "target":"IotCore" + } + } + } +} + """ + And I deploy the Greengrass deployment configuration + Then the Greengrass deployment is COMPLETED on the device after 5 minutes + And the aws.greengrass.clientdevices.mqtt.EMQX log on the device contains the line "is running now!." within 1 minutes + + And I discover core device broker as "default_broker" from "large_payload_publisher" in OTF + And I connect device "large_payload_publisher" on to "default_broker" using mqtt "" + + And I subscribe "large_payload_publisher" to "${large_payload_publisher}topic/to/iotcore" with qos 1 + When I publish from "large_payload_publisher" to "${large_payload_publisher}topic/to/iotcore" with qos 1 and large message with beginning of "Hello world1" with size 128 KB + Then message beginning with "Hello world1" received on "large_payload_publisher" from "${large_payload_publisher}topic/to/iotcore" topic within 10 seconds + + @mqtt3 @sdk-java + Examples: + | mqtt-v | name | agent | recipe | + | v3 | sdk-java | aws.greengrass.client.Mqtt5JavaSdkClient | client_java_sdk.yaml | + + @mqtt3 @mosquitto-c @SkipOnWindows + Examples: + | mqtt-v | name | agent | recipe | + | v3 | mosquitto-c | aws.greengrass.client.MqttMosquittoClient | client_mosquitto_c.yaml | + + @mqtt3 @paho-java + Examples: + | mqtt-v | name | agent | recipe | + | v3 | paho-java | aws.greengrass.client.Mqtt5JavaPahoClient | client_java_paho.yaml | + + @mqtt3 @paho-python @SkipOnWindows + Examples: + | mqtt-v | name | agent | recipe | + | v3 | paho-python | aws.greengrass.client.Mqtt5PythonPahoClient | client_python_paho.yaml | + + @mqtt5 @sdk-java + Examples: + | mqtt-v | name | agent | recipe | + | v5 | sdk-java | aws.greengrass.client.Mqtt5JavaSdkClient | client_java_sdk.yaml | + + @mqtt5 @mosquitto-c @SkipOnWindows + Examples: + | mqtt-v | name | agent | recipe | + | v5 | mosquitto-c | aws.greengrass.client.MqttMosquittoClient | client_mosquitto_c.yaml | + + @mqtt5 @paho-java + Examples: + | mqtt-v | name | agent | recipe | + | v5 | paho-java | aws.greengrass.client.Mqtt5JavaPahoClient | client_java_paho.yaml | + + @mqtt5 @paho-python @SkipOnWindows + Examples: + | mqtt-v | name | agent | recipe | + | v5 | paho-python | aws.greengrass.client.Mqtt5PythonPahoClient | client_python_paho.yaml | + @GGMQ-1-T101 Scenario Outline: GGMQ-1-T101--: As a customer, I can use publish retain flag using MQTT V3.1.1 From 29f79187174dd5710c02c749df6632e330bc6cf2 Mon Sep 17 00:00:00 2001 From: Uarbekov Date: Tue, 18 Jul 2023 18:45:11 +0600 Subject: [PATCH 2/5] fix(uat): fix generating long message by repeating --- .../client/control/api/addon/EventFilter.java | 15 -------- .../addon/MqttMessageEvent.java | 13 ++----- .../greengrass/steps/MqttControlSteps.java | 35 +++++++++---------- 3 files changed, 20 insertions(+), 43 deletions(-) diff --git a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/addon/EventFilter.java b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/addon/EventFilter.java index 90b236a17..af1361930 100644 --- a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/addon/EventFilter.java +++ b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/api/addon/EventFilter.java @@ -35,7 +35,6 @@ public final class EventFilter { private final Integer messageExpiryInterval; private final String responseTopic; private final byte[] correlationData; - private final Boolean isMessageFull; EventFilter(Builder builder) { super(); @@ -56,7 +55,6 @@ public final class EventFilter { this.messageExpiryInterval = builder.messageExpiryInterval; this.responseTopic = builder.responseTopic; this.correlationData = builder.correlationData; - this.isMessageFull = builder.isMessageFull; } /** @@ -80,7 +78,6 @@ public static class Builder { private Integer messageExpiryInterval; private String responseTopic; private byte[] correlationData; - private Boolean isMessageFull; /** * Sets type of event. @@ -301,18 +298,6 @@ public Builder withCorrelationData(byte[] correlationData) { return this; } - - /** - * Sets indicator of message is full or not. - * Applicable only for MQTT message events - * - * @param isMessageFull is message full or not - */ - public Builder withIsMessageFull(Boolean isMessageFull) { - this.isMessageFull = isMessageFull; - return this; - } - public EventFilter build() { return new EventFilter(this); } diff --git a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/addon/MqttMessageEvent.java b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/addon/MqttMessageEvent.java index 565e8f682..c378e7225 100644 --- a/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/addon/MqttMessageEvent.java +++ b/uat/mqtt-client-control/src/main/java/com/aws/greengrass/testing/mqtt/client/control/implementation/addon/MqttMessageEvent.java @@ -13,7 +13,6 @@ import lombok.Getter; import lombok.NonNull; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; @@ -112,7 +111,7 @@ public boolean isMatched(@NonNull EventFilter filter) { // TODO: check QoS ? it can be differ on transmit and receive sides // check content - return comparePayload(filter.getContent(), filter.getIsMessageFull()); + return comparePayload(filter.getContent()); } @SuppressWarnings("PMD.CompareObjectsWithEquals") @@ -135,7 +134,7 @@ private boolean compareConnection(ConnectionControl expectedConnectionControl, S return connectionName == null || connectionName.equals(getConnectionName()); } - private boolean comparePayload(byte[] expected, Boolean isMessageFull) { + private boolean comparePayload(byte[] expected) { if (expected == null) { return true; } @@ -145,13 +144,7 @@ private boolean comparePayload(byte[] expected, Boolean isMessageFull) { return false; } - if (isMessageFull == null || isMessageFull) { - return Arrays.equals(expected, byteStringPayload.toByteArray()); - } else { - String expectedBeginningMessage = new String(expected, StandardCharsets.UTF_8); - String actualMessage = byteStringPayload.toString(StandardCharsets.UTF_8); - return actualMessage.contains(expectedBeginningMessage); - } + return Arrays.equals(expected, byteStringPayload.toByteArray()); } private boolean isRetainMatched(Boolean retain) { diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java b/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java index 6c32f223b..95fcc76b6 100644 --- a/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java +++ b/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java @@ -58,14 +58,12 @@ import software.amazon.awssdk.utils.CollectionUtils; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -122,6 +120,7 @@ public class MqttControlSteps { private static final String DEFAULT_RESPONSE_TOPIC = null; private static final String DEFAULT_CORRELATION_DATA = null; private static final String DEFAULT_CONTENT_TYPE = null; + private static final String DEFAULT_LONG_MESSAGE = null; // subscribe efault properties private static final Integer DEFAULT_SUBSCRIPTION_ID = null; // NOTE: do not set for IoT Core broker !!! @@ -222,6 +221,9 @@ public class MqttControlSteps { /** Actual value of content type to receive in received messages. */ private String rxContentType = DEFAULT_CONTENT_TYPE; + /** Actual value of long messages. */ + private String txLongMessage = DEFAULT_LONG_MESSAGE; + private final Map> brokers = new HashMap<>(); private final Map mqttVersions = new HashMap<>(); @@ -942,18 +944,13 @@ public void publishLargeMessage(String clientDeviceId, String topicString, int q throw new RuntimeException("messageBeginning is larger than expected size"); } - byte[] randomMessageBytes = new byte[fullMessageSize - messageBeginning.length()]; - - new Random().nextBytes(randomMessageBytes); - String randomMessage = new String(randomMessageBytes, StandardCharsets.UTF_8); - - while (randomMessage.contains(messageBeginning)) { - new Random().nextBytes(randomMessageBytes); - randomMessage = new String(randomMessageBytes, StandardCharsets.UTF_8); + StringBuilder longMessageBuilder = new StringBuilder(); + for (int i = 0; i < fullMessageSize / messageBeginning.length(); i++) { + longMessageBuilder.append(messageBeginning); } + txLongMessage = longMessageBuilder.toString(); - String message = messageBeginning + randomMessage; - publish(clientDeviceId, topicString, qos, message, PublishReasonCode.SUCCESS.getValue()); + publish(clientDeviceId, topicString, qos, txLongMessage, PublishReasonCode.SUCCESS.getValue()); } /** @@ -1026,7 +1023,7 @@ public void publish(String clientDeviceId, String topicString, int qos, String m @And("message {string} is not received on {string} from {string} topic within {int} {word}") public void notReceivedMessage(String message, String clientDeviceId, String topicString, int value, String unit) throws TimeoutException, InterruptedException { - receive(message, clientDeviceId, topicString, value, unit, false, false); + receive(message, clientDeviceId, topicString, value, unit, false); } /** @@ -1045,7 +1042,7 @@ public void notReceivedMessage(String message, String clientDeviceId, String top @And("message {string} received on {string} from {string} topic within {int} {word}") public void receivedMessage(String message, String clientDeviceId, String topicString, int value, String unit) throws TimeoutException, InterruptedException { - receive(message, clientDeviceId, topicString, value, unit, true, true); + receive(message, clientDeviceId, topicString, value, unit, true); } @@ -1066,7 +1063,11 @@ public void receivedMessage(String message, String clientDeviceId, String topicS public void receivedMessageBeginning(String message, String clientDeviceId, String topicString, int value, String unit) throws TimeoutException, InterruptedException { - receive(message, clientDeviceId, topicString, value, unit, true, false); + if (!txLongMessage.contains(message)) { + log.error("Long message does not contain '{}'", message); + throw new RuntimeException("Long message does not contain expected beginning"); + } + receive(txLongMessage, clientDeviceId, topicString, value, unit, true); } /** @@ -1078,14 +1079,13 @@ public void receivedMessageBeginning(String message, String clientDeviceId, Stri * @param value the duration of time to wait for message * @param unit the time unit to wait * @param isExpectedMessage used for setting message expectation - * @param isMessageFull used for setting full message expectation * @throws TimeoutException when matched message was not received in specified duration of time * @throws RuntimeException on internal errors * @throws InterruptedException then thread has been interrupted */ @SuppressWarnings("PMD.UseObjectForClearerAPI") public void receive(String message, String clientDeviceId, String topicString, int value, - String unit, boolean isExpectedMessage, boolean isMessageFull) + String unit, boolean isExpectedMessage) throws TimeoutException, InterruptedException { // getting connectionControl by clientDeviceId final String clientDeviceThingName = getClientDeviceThingName(clientDeviceId); @@ -1106,7 +1106,6 @@ public void receive(String message, String clientDeviceId, String topicString, i .withMessageExpiryInterval(rxMessageExpiryInterval) .withResponseTopic(rxResponseTopic) .withCorrelationData(rxCorrelationData) - .withIsMessageFull(isMessageFull) .build(); // convert time units TimeUnit timeUnit = TimeUnit.valueOf(unit.toUpperCase()); From 520c53ffdecb48bb29f5cad551f619427799b9bf Mon Sep 17 00:00:00 2001 From: Uarbekov Date: Tue, 18 Jul 2023 19:24:53 +0600 Subject: [PATCH 3/5] fix(uat): fix generating long message by repeating --- .../greengrass/steps/MqttControlSteps.java | 54 +++++++++---------- .../greengrass/features/ggmq-1.feature | 4 +- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java b/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java index 95fcc76b6..9773e9333 100644 --- a/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java +++ b/uat/testing-features/src/main/java/com/aws/greengrass/steps/MqttControlSteps.java @@ -120,7 +120,6 @@ public class MqttControlSteps { private static final String DEFAULT_RESPONSE_TOPIC = null; private static final String DEFAULT_CORRELATION_DATA = null; private static final String DEFAULT_CONTENT_TYPE = null; - private static final String DEFAULT_LONG_MESSAGE = null; // subscribe efault properties private static final Integer DEFAULT_SUBSCRIPTION_ID = null; // NOTE: do not set for IoT Core broker !!! @@ -221,9 +220,6 @@ public class MqttControlSteps { /** Actual value of content type to receive in received messages. */ private String rxContentType = DEFAULT_CONTENT_TYPE; - /** Actual value of long messages. */ - private String txLongMessage = DEFAULT_LONG_MESSAGE; - private final Map> brokers = new HashMap<>(); private final Map mqttVersions = new HashMap<>(); @@ -928,29 +924,17 @@ private void subscribe(@NonNull String clientDeviceId, @NonNull String topicFilt * @param topicString the topic to publish message * @param qos the value of MQTT QoS for publishing * @param messageBeginning the content of message with beginning to publish - * @param kbSize the size of message + * @param messageLength the length of message * @throws StatusRuntimeException on gRPC errors * @throws IllegalArgumentException on invalid QoS argument */ @When("I publish from {string} to {string} with qos {int} and large message with beginning of {string} " - + "with size {int} KB") + + "with length {int}") public void publishLargeMessage(String clientDeviceId, String topicString, int qos, - String messageBeginning, int kbSize) { - int lengthInBytes = kbSize * 1028; - // 1 char is 2 bytes - int fullMessageSize = lengthInBytes / 2; - - if (messageBeginning.length() > fullMessageSize) { - throw new RuntimeException("messageBeginning is larger than expected size"); - } - - StringBuilder longMessageBuilder = new StringBuilder(); - for (int i = 0; i < fullMessageSize / messageBeginning.length(); i++) { - longMessageBuilder.append(messageBeginning); - } - txLongMessage = longMessageBuilder.toString(); + String messageBeginning, int messageLength) { + String longMessage = generateLongMessage(messageBeginning, messageLength); - publish(clientDeviceId, topicString, qos, txLongMessage, PublishReasonCode.SUCCESS.getValue()); + publish(clientDeviceId, topicString, qos, longMessage, PublishReasonCode.SUCCESS.getValue()); } /** @@ -1049,7 +1033,8 @@ public void receivedMessage(String message, String clientDeviceId, String topicS /** * Verify is MQTT message is received in limited duration of time. * - * @param message content of message to receive + * @param message beginning of long message to receive + * @param messageLength the length of long message * @param clientDeviceId the user defined client device id * @param topicString the topic (not a filter) which message has been sent * @param value the duration of time to wait for message @@ -1059,15 +1044,13 @@ public void receivedMessage(String message, String clientDeviceId, String topicS * @throws InterruptedException then thread has been interrupted */ @SuppressWarnings("PMD.UseObjectForClearerAPI") - @And("message beginning with {string} received on {string} from {string} topic within {int} {word}") - public void receivedMessageBeginning(String message, String clientDeviceId, String topicString, int value, - String unit) + @And("message beginning with {string} and with length {int} received on {string} " + + "from {string} topic within {int} {word}") + public void receivedMessageBeginning(String message, int messageLength, String clientDeviceId, String topicString, + int value, String unit) throws TimeoutException, InterruptedException { - if (!txLongMessage.contains(message)) { - log.error("Long message does not contain '{}'", message); - throw new RuntimeException("Long message does not contain expected beginning"); - } - receive(txLongMessage, clientDeviceId, topicString, value, unit, true); + String longMessage = generateLongMessage(message, messageLength); + receive(longMessage, clientDeviceId, topicString, value, unit, true); } /** @@ -1263,6 +1246,17 @@ public void unsubscribe(String clientDeviceId, String filter) { log.info("MQTT topics filter {} has been unsubscribed", filter); } + private String generateLongMessage(String messageBeginning, int totalLength) { + StringBuilder longMessageBuilder = new StringBuilder(); + int repeatedTimes = totalLength / messageBeginning.length() + 1; + + for (int i = 0; i < repeatedTimes; i++) { + longMessageBuilder.append(messageBeginning); + } + + return longMessageBuilder.substring(0, totalLength); + } + private IotPolicySpec createDefaultClientDevicePolicy(String policyNameOverride) throws IOException { return iotSteps.createPolicy(DEFAULT_CLIENT_DEVICE_POLICY_CONFIG, policyNameOverride); } diff --git a/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature b/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature index 373cfcdde..1a5897d24 100644 --- a/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature +++ b/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature @@ -1297,8 +1297,8 @@ Feature: GGMQ-1 And I connect device "large_payload_publisher" on to "default_broker" using mqtt "" And I subscribe "large_payload_publisher" to "${large_payload_publisher}topic/to/iotcore" with qos 1 - When I publish from "large_payload_publisher" to "${large_payload_publisher}topic/to/iotcore" with qos 1 and large message with beginning of "Hello world1" with size 128 KB - Then message beginning with "Hello world1" received on "large_payload_publisher" from "${large_payload_publisher}topic/to/iotcore" topic within 10 seconds + When I publish from "large_payload_publisher" to "${large_payload_publisher}topic/to/iotcore" with qos 1 and large message with beginning of "Hello world1" with length 130098 + Then message beginning with "Hello world1" and with length 130098 received on "large_payload_publisher" from "${large_payload_publisher}topic/to/iotcore" topic within 10 seconds @mqtt3 @sdk-java Examples: From dfa284f5554966315e8154577cbbf4f3de7d068f Mon Sep 17 00:00:00 2001 From: Uarbekov Date: Wed, 19 Jul 2023 17:33:19 +0600 Subject: [PATCH 4/5] fix(uat): add comment to clarify scenario --- .../src/main/resources/greengrass/features/ggmq-1.feature | 3 +++ 1 file changed, 3 insertions(+) diff --git a/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature b/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature index 1a5897d24..be588a676 100644 --- a/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature +++ b/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature @@ -1297,6 +1297,9 @@ Feature: GGMQ-1 And I connect device "large_payload_publisher" on to "default_broker" using mqtt "" And I subscribe "large_payload_publisher" to "${large_payload_publisher}topic/to/iotcore" with qos 1 + # 130098 is 74 bytes short of 128KiB. However, the Moquette limit is frame size, not payload size. + # We should address this, but for now, we'll just decrease the payload slightly. + # NOTE: 74 bytes include a fixed header + variable length header which includes the topic name When I publish from "large_payload_publisher" to "${large_payload_publisher}topic/to/iotcore" with qos 1 and large message with beginning of "Hello world1" with length 130098 Then message beginning with "Hello world1" and with length 130098 received on "large_payload_publisher" from "${large_payload_publisher}topic/to/iotcore" topic within 10 seconds From 8ccc2d619b6aa64506a2dd3380adb6987d088bcd Mon Sep 17 00:00:00 2001 From: Uarbekov Date: Fri, 21 Jul 2023 13:37:12 +0600 Subject: [PATCH 5/5] fix(uat): add steps for t22 --- .../src/main/resources/greengrass/features/ggmq-1.feature | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature b/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature index be588a676..ccea727bd 100644 --- a/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature +++ b/uat/testing-features/src/main/resources/greengrass/features/ggmq-1.feature @@ -1303,6 +1303,10 @@ Feature: GGMQ-1 When I publish from "large_payload_publisher" to "${large_payload_publisher}topic/to/iotcore" with qos 1 and large message with beginning of "Hello world1" with length 130098 Then message beginning with "Hello world1" and with length 130098 received on "large_payload_publisher" from "${large_payload_publisher}topic/to/iotcore" topic within 10 seconds + And I subscribe "large_payload_publisher" to "topic_for_large_message" with qos 0 + When I publish from "large_payload_publisher" to "topic_for_large_message" with qos 1 and large message with beginning of "Message Larger than 128KB" with length 140000 + Then message beginning with "Message Larger than 128KB" and with length 140000 received on "large_payload_publisher" from "topic_for_large_message" topic within 10 seconds + @mqtt3 @sdk-java Examples: | mqtt-v | name | agent | recipe |