From a31da49c26e8dd81d9060d5f21c4c1b4ee7ef63f Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Tue, 31 Oct 2023 21:30:35 +0100 Subject: [PATCH 01/11] feat: add missing record properties Add missing properties like rejectionType, reason etc. Rework serialization, use kotlin serialization, impl specific serializer --- .../main/kotlin/io/zell/zdb/log/LogContent.kt | 20 +++++----- .../io/zell/zdb/log/LogContentReader.kt | 28 +++++++++++-- .../zell/zdb/log/records/ApplicationRecord.kt | 24 ++++++----- .../zell/zdb/log/records/IntentSerializer.kt | 40 +++++++++++++++++++ .../kotlin/io/zell/zdb/log/records/Record.kt | 21 ++++++++-- .../io/zell/zdb/log/records/RecordValue.kt | 18 --------- 6 files changed, 106 insertions(+), 45 deletions(-) create mode 100644 backend/src/main/kotlin/io/zell/zdb/log/records/IntentSerializer.kt delete mode 100644 backend/src/main/kotlin/io/zell/zdb/log/records/RecordValue.kt diff --git a/backend/src/main/kotlin/io/zell/zdb/log/LogContent.kt b/backend/src/main/kotlin/io/zell/zdb/log/LogContent.kt index 39656327..9a06be3e 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/LogContent.kt +++ b/backend/src/main/kotlin/io/zell/zdb/log/LogContent.kt @@ -56,17 +56,19 @@ class LogContent { .append("\\n").append(entry.intent.name()) if (entry.valueType == ValueType.PROCESS_INSTANCE) { - val piRelatedValue = entry.recordValue.piRelatedValue - piRelatedValue.bpmnElementType?.let { - content.append("\\n").append(it) - } + val piRelatedValue = entry.piRelatedValue + piRelatedValue?.let { + piRelatedValue.bpmnElementType?.let { + content.append("\\n").append(it) + } - piRelatedValue.processInstanceKey?.let { - content.append("\\nPI Key: ").append(it) - } + piRelatedValue.processInstanceKey?.let { + content.append("\\nPI Key: ").append(it) + } - piRelatedValue.processDefinitionKey?.let { - content.append("\\nPD Key: ").append(it) + piRelatedValue.processDefinitionKey?.let { + content.append("\\nPD Key: ").append(it) + } } } diff --git a/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt b/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt index e35d676c..a7897f25 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt +++ b/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt @@ -19,6 +19,8 @@ import io.atomix.raft.storage.log.entry.SerializedApplicationEntry import io.camunda.zeebe.logstreams.impl.log.LoggedEventImpl import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter import io.camunda.zeebe.protocol.impl.record.RecordMetadata +import io.camunda.zeebe.protocol.record.RecordType +import io.camunda.zeebe.protocol.record.RejectionType import io.zell.zdb.log.records.* import kotlinx.serialization.json.Json import org.agrona.concurrent.UnsafeBuffer @@ -111,11 +113,17 @@ class LogContentReader(logPath: Path) : Iterator { metadata.recordType, metadata.valueType, metadata.intent, + metadata.rejectionType, + metadata.rejectionReason, + metadata.requestId, + metadata.requestStreamId, + metadata.protocolVersion, metadata.brokerVersion.toString(), metadata.recordVersion, - RecordValue(valueJson, - json.decodeFromString(valueJson) ) - ) + metadata.authorization.authData.toString(), + Json.decodeFromString(valueJson), + json.decodeFromString(valueJson) ) + applicationRecord.entries.add(parsedRecord) @@ -153,11 +161,23 @@ class LogContentReader(logPath: Path) : Iterator { applicationRecordFilter = { record : ApplicationRecord -> record.entries.asSequence() - .map { it.recordValue.piRelatedValue } + .map { it.piRelatedValue } + .filter { it != null } + .map { it!! } .filter { it.processInstanceKey != null } .asStream() .map { it.processInstanceKey } .anyMatch(instanceKey::equals) } } + + fun filterForRejections() { + applicationRecordFilter = { + record : ApplicationRecord -> + record.entries.asSequence() + .map { it.recordType } + .any { it == RecordType.COMMAND_REJECTION } + } + } + } diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/ApplicationRecord.kt b/backend/src/main/kotlin/io/zell/zdb/log/records/ApplicationRecord.kt index 1545b49f..66b0b831 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/records/ApplicationRecord.kt +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/ApplicationRecord.kt @@ -58,18 +58,20 @@ class ApplicationRecord(val index: Long, val term: Long, val highestPosition: Lo .append(separator) .append(record.intent) - val piRelatedValue = record.recordValue.piRelatedValue - piRelatedValue.processInstanceKey?.let { - stringBuilder - .append(separator) - .append(it) - .append(separator) - } + val piRelatedValue = record.piRelatedValue + piRelatedValue?.let { + piRelatedValue.processInstanceKey?.let { + stringBuilder + .append(separator) + .append(it) + .append(separator) + } - piRelatedValue.bpmnElementType?.let { - stringBuilder - .append(it) - .append(separator) + piRelatedValue.bpmnElementType?.let { + stringBuilder + .append(it) + .append(separator) + } } return stringBuilder.toString() } diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/IntentSerializer.kt b/backend/src/main/kotlin/io/zell/zdb/log/records/IntentSerializer.kt new file mode 100644 index 00000000..73b036e4 --- /dev/null +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/IntentSerializer.kt @@ -0,0 +1,40 @@ +/* + * Copyright © 2021 Christopher Kujawa (zelldon91@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.zell.zdb.log.records + +import io.camunda.zeebe.protocol.record.intent.Intent +import kotlinx.serialization.KSerializer +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.descriptors.buildClassSerialDescriptor +import kotlinx.serialization.descriptors.element +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder + +class IntentSerializer : KSerializer { + override fun deserialize(decoder: Decoder): Intent { + // there is currently not easy way to deserialize, and we also don't need + TODO("Not yet implemented") + } + + override val descriptor: SerialDescriptor + = buildClassSerialDescriptor("Intent") { + element("intent") + } + + override fun serialize(encoder: Encoder, value: Intent) { + encoder.encodeString(value.name()) + } +} \ No newline at end of file diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/Record.kt b/backend/src/main/kotlin/io/zell/zdb/log/records/Record.kt index b8497885..209c4ee4 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/records/Record.kt +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/Record.kt @@ -16,22 +16,37 @@ package io.zell.zdb.log.records import io.camunda.zeebe.protocol.record.RecordType +import io.camunda.zeebe.protocol.record.RejectionType import io.camunda.zeebe.protocol.record.ValueType import io.camunda.zeebe.protocol.record.intent.Intent +import kotlinx.serialization.Serializable +import kotlinx.serialization.Transient +import kotlinx.serialization.encodeToString +import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonElement +@Serializable data class Record(val position: Long, val sourceRecordPosition: Long, val timestamp: Long, val key: Long, val recordType: RecordType, val valueType: ValueType, + @Serializable(with = IntentSerializer::class) val intent: Intent, + val rejectionType: RejectionType? = RejectionType.NULL_VAL, + val rejectionReason: String? = "", + val requestId: Long? = 0, + val requestStreamId: Int = 0, + val protocolVersion: Int, val brokerVersion: String, val recordVersion: Int, - val recordValue: RecordValue - + val authData: String, + val recordValue: JsonElement, + /*Transient marks to ignore the property during serialization */ + @Transient val piRelatedValue: ProcessInstanceRelatedValue? = null ) { override fun toString(): String { - return """{"key": $key, "position": $position, "sourceRecordPosition": $sourceRecordPosition, "intent": "$intent", "recordType": "$recordType", "valueType": "$valueType", "timestamp": $timestamp, "recordVersion": $recordVersion, "brokerVersion": "$brokerVersion", "value": ${recordValue.valueJson}}""" + return Json.encodeToString(this) } } \ No newline at end of file diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/RecordValue.kt b/backend/src/main/kotlin/io/zell/zdb/log/records/RecordValue.kt deleted file mode 100644 index 736fd2b3..00000000 --- a/backend/src/main/kotlin/io/zell/zdb/log/records/RecordValue.kt +++ /dev/null @@ -1,18 +0,0 @@ -/* - * Copyright © 2021 Christopher Kujawa (zelldon91@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.zell.zdb.log.records - -data class RecordValue(val valueJson: String, val piRelatedValue: ProcessInstanceRelatedValue) \ No newline at end of file From 112032f1cce649bd7559ff459475850702de119c Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Tue, 31 Oct 2023 22:05:28 +0100 Subject: [PATCH 02/11] test: adjust latest tests --- .../test/kotlin/io/zell/zdb/TestUtils.java | 1 + .../io/zell/zdb/ZeebeContentCreator.java | 24 ++- .../io/zell/zdb/latest/VersionLatestTest.java | 140 +++++++++++++++--- 3 files changed, 139 insertions(+), 26 deletions(-) diff --git a/backend/src/test/kotlin/io/zell/zdb/TestUtils.java b/backend/src/test/kotlin/io/zell/zdb/TestUtils.java index 842e6124..f6e6f5b5 100644 --- a/backend/src/test/kotlin/io/zell/zdb/TestUtils.java +++ b/backend/src/test/kotlin/io/zell/zdb/TestUtils.java @@ -19,6 +19,7 @@ public final class TestUtils { public static final String CONTAINER_PATH = "/usr/local/zeebe/data/"; + public static final String TIMESTAMP_REGEX = "\"timestamp\":[0-9]+,"; private static final String TMP_FOLDER_FORMAT = "data-%s-%d"; private TestUtils() {} diff --git a/backend/src/test/kotlin/io/zell/zdb/ZeebeContentCreator.java b/backend/src/test/kotlin/io/zell/zdb/ZeebeContentCreator.java index 2c26a565..a3c240c8 100644 --- a/backend/src/test/kotlin/io/zell/zdb/ZeebeContentCreator.java +++ b/backend/src/test/kotlin/io/zell/zdb/ZeebeContentCreator.java @@ -20,6 +20,7 @@ import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; import io.camunda.zeebe.model.bpmn.Bpmn; import io.camunda.zeebe.model.bpmn.BpmnModelInstance; + import java.time.Duration; import java.util.Map; @@ -33,15 +34,15 @@ public ZeebeContentCreator(BpmnModelInstance processModel) { } private static final BpmnModelInstance SIMPLE_PROCESS = Bpmn.createExecutableProcess("simple") - .startEvent() - .endEvent() - .done(); + .startEvent() + .endEvent() + .done(); public DeploymentEvent deploymentEvent; public ProcessInstanceEvent processInstanceEvent; - public void createContent(String gatewayAddress) { - createContent(gatewayAddress, 1); + public void createContent(String gatewayAddress) { + createContent(gatewayAddress, 1); } public void createLargeContent(String gatewayAddress) { @@ -54,6 +55,7 @@ public void createContent(String gatewayAddress, final int amountOfMessages) { .usePlaintext() .build(); + deploymentEvent = client.newDeployResourceCommand() .addProcessModel(processModel, "process.bpmn") .addProcessModel(SIMPLE_PROCESS, "simple.bpmn") @@ -99,6 +101,18 @@ public void createContent(String gatewayAddress, final int amountOfMessages) { } } while (responseJobKey <= 0); + try { + // cause rejection + client + .newCreateInstanceCommand() + .bpmnProcessId("nonExisting") + .latestVersion() + .send() + .join(); + } catch (Exception ex) { + // ignore - expected + } + client.close(); } diff --git a/backend/src/test/kotlin/io/zell/zdb/latest/VersionLatestTest.java b/backend/src/test/kotlin/io/zell/zdb/latest/VersionLatestTest.java index 4069e70a..c4334649 100644 --- a/backend/src/test/kotlin/io/zell/zdb/latest/VersionLatestTest.java +++ b/backend/src/test/kotlin/io/zell/zdb/latest/VersionLatestTest.java @@ -22,6 +22,8 @@ import io.camunda.zeebe.model.bpmn.Bpmn; import io.camunda.zeebe.model.bpmn.BpmnModelInstance; import io.camunda.zeebe.protocol.ZbColumnFamilies; +import io.camunda.zeebe.protocol.record.RejectionType; +import io.camunda.zeebe.protocol.record.ValueType; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; import io.camunda.zeebe.protocol.record.value.BpmnElementType; import io.camunda.zeebe.protocol.record.value.ErrorType; @@ -42,7 +44,6 @@ import io.zell.zdb.state.incident.IncidentState; import io.zell.zdb.state.instance.InstanceState; import io.zell.zdb.state.process.ProcessState; -import io.zell.zdb.v81.Version81Test; import org.agrona.concurrent.UnsafeBuffer; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.AfterAll; @@ -61,11 +62,10 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Base64; -import java.util.HashMap; -import java.util.List; +import java.util.*; +import java.util.stream.StreamSupport; +import static io.zell.zdb.TestUtils.TIMESTAMP_REGEX; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -132,9 +132,9 @@ public void shouldReadStatusFromLog() { final var status = logStatus.status(); // then - assertThat(status.getHighestIndex()).isEqualTo(211); + assertThat(status.getHighestIndex()).isEqualTo(213); assertThat(status.getHighestTerm()).isEqualTo(1); - assertThat(status.getHighestRecordPosition()).isEqualTo(258); + assertThat(status.getHighestRecordPosition()).isEqualTo(260); assertThat(status.getLowestIndex()).isEqualTo(1); assertThat(status.getLowestRecordPosition()).isEqualTo(1); @@ -187,9 +187,9 @@ public void shouldReadStatusFromLog() { final var status = logStatus.status(); // then - assertThat(status.getHighestIndex()).isEqualTo(13); + assertThat(status.getHighestIndex()).isEqualTo(15); assertThat(status.getHighestTerm()).isEqualTo(1); - assertThat(status.getHighestRecordPosition()).isEqualTo(60); + assertThat(status.getHighestRecordPosition()).isEqualTo(62); assertThat(status.getLowestIndex()).isEqualTo(1); assertThat(status.getLowestRecordPosition()).isEqualTo(1); @@ -243,6 +243,104 @@ public void shouldReadLogContentWithIterator() { verifyCompleteLog(records); } + @Test + public void shouldReadRejection() { + // given + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + + // when + logContentReader.filterForRejections(); + + // then + final var rejection = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> record.component8() != RejectionType.NULL_VAL) + .findFirst(); + + assertThat(rejection).isPresent(); + assertThat(rejection.get().component8()).isEqualTo(RejectionType.NOT_FOUND); + assertThat(rejection.get().component9()).isEqualTo("Expected to find process definition with process ID 'nonExisting', but none found"); + } + + @Test + public void shouldSerializeRejectionToJson() throws JsonProcessingException { + // given + final var expectedJson = OBJECT_MAPPER.readTree(""" + {"position":62,"sourceRecordPosition":61,"key":-1,"recordType":"COMMAND_REJECTION", + "valueType":"PROCESS_INSTANCE_CREATION","intent":"CREATE","rejectionType":"NOT_FOUND", + "rejectionReason":"Expected to find process definition with process ID 'nonExisting', but none found", + "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":4,"brokerVersion":"8.4.0", + "recordVersion":1,"authData":"", + "recordValue":{"bpmnProcessId":"nonExisting","processDefinitionKey":0,"processInstanceKey":-1, + "version":-1,"variables":"gA==","fetchVariables":[], + "startInstructions":[],"tenantId":""}} + } +"""); + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + + // when + logContentReader.filterForRejections(); + + // then + final var rejection = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> !record.component8().equals(RejectionType.NULL_VAL.name())) + .findFirst(); + + assertThat(rejection).isPresent(); + assertThat(rejection.get().component8()).isEqualTo(RejectionType.NOT_FOUND); + assertThat(rejection.get().component9()).isEqualTo("Expected to find process definition with process ID 'nonExisting', but none found"); + + final var recordJson = rejection.get().toString() + .replaceFirst(TIMESTAMP_REGEX, ""); + final var actualJson = OBJECT_MAPPER.readTree(recordJson); + assertThat(actualJson).isNotNull(); // is valid json + assertThat(actualJson).isEqualTo(expectedJson); + } + + @Test + public void shouldSerializeRecordToJson() throws JsonProcessingException { + // given + final var expectedJson = OBJECT_MAPPER.readTree(""" + {"position":12,"sourceRecordPosition":5,"key":2251799813685252,"recordType":"EVENT", + "valueType":"PROCESS_INSTANCE","intent":"ELEMENT_ACTIVATED","requestId":-1, + "requestStreamId":-2147483648,"protocolVersion":4,"brokerVersion":"8.4.0","recordVersion":1, + "authData":"", + "recordValue":{"bpmnElementType":"PROCESS","elementId":"process","bpmnProcessId":"process", + "version":1,"processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685252, + "flowScopeKey":-1,"bpmnEventType":"UNSPECIFIED","parentProcessInstanceKey":-1, + "parentElementInstanceKey":-1,"tenantId":""}} +"""); + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + logContentReader.filterForProcessInstance(zeebeContentCreator.processInstanceEvent.getProcessInstanceKey()); + + // when + final var piActivated = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> record.component6() == ValueType.PROCESS_INSTANCE) + .filter(record -> record.component7() == ProcessInstanceIntent.ELEMENT_ACTIVATED) + .filter(record -> record.getPiRelatedValue() != null) + .filter(record -> record.getPiRelatedValue().getBpmnElementType() == BpmnElementType.PROCESS) + .findFirst(); + + // then + assertThat(piActivated).isPresent(); + final var recordJson = piActivated.get().toString() + .replaceFirst(TIMESTAMP_REGEX, ""); + final var actualJson = OBJECT_MAPPER.readTree(recordJson); + assertThat(actualJson).isNotNull(); // is valid json + assertThat(actualJson).isEqualTo(expectedJson); + } + @Test public void shouldSkipFirstPartOfLog() { // given @@ -255,13 +353,13 @@ public void shouldSkipFirstPartOfLog() { logContentReader.forEachRemaining(records::add); // then - assertThat(records).hasSize(9); + assertThat(records).hasSize(11); // we skip the first raft record assertThat(records.stream().filter(RaftRecord.class::isInstance).count()).isEqualTo(0); - assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(9); + assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(11); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); assertThat(minIndex).isEqualTo(5); @@ -271,7 +369,7 @@ public void shouldSkipFirstPartOfLog() { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) @@ -328,9 +426,9 @@ public void shouldSeekToEndOfLogIfNoExistingSeek() { assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(1); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); - assertThat(minIndex).isEqualTo(13); + assertThat(minIndex).isEqualTo(15); final var maxPosition = records.stream() .filter(ApplicationRecord.class::isInstance) @@ -338,14 +436,14 @@ public void shouldSeekToEndOfLogIfNoExistingSeek() { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) .map(ApplicationRecord::getLowestPosition) .min(Long::compareTo) .orElseThrow(); - assertThat(minPosition).isEqualTo(60); + assertThat(minPosition).isEqualTo(62); } @Test @@ -604,12 +702,12 @@ public void shouldFilterWithProcessInstanceKeyAndSetBeginAndEndOfLogPosition() { } private static void verifyCompleteLog(List records) { - assertThat(records).hasSize(13); + assertThat(records).hasSize(15); assertThat(records.stream().filter(RaftRecord.class::isInstance).count()).isEqualTo(1); - assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(12); + assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(14); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); assertThat(minIndex).isEqualTo(1); @@ -619,7 +717,7 @@ private static void verifyCompleteLog(List records) { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) From 264c8b2e6a034b28c4e9e55f4cb303b9fadd9e8a Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Tue, 31 Oct 2023 22:06:34 +0100 Subject: [PATCH 03/11] test: adjust 8.3 tests --- .../kotlin/io/zell/zdb/v83/Version83Test.java | 158 ++++++++++++++---- 1 file changed, 128 insertions(+), 30 deletions(-) diff --git a/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java b/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java index f415eacd..d33a79d6 100644 --- a/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java +++ b/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java @@ -22,6 +22,8 @@ import io.camunda.zeebe.model.bpmn.Bpmn; import io.camunda.zeebe.model.bpmn.BpmnModelInstance; import io.camunda.zeebe.protocol.ZbColumnFamilies; +import io.camunda.zeebe.protocol.record.RejectionType; +import io.camunda.zeebe.protocol.record.ValueType; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; import io.camunda.zeebe.protocol.record.value.BpmnElementType; import io.camunda.zeebe.protocol.record.value.ErrorType; @@ -30,6 +32,7 @@ import io.zell.zdb.TestUtils; import io.zell.zdb.ZeebeContentCreator; import io.zell.zdb.ZeebePaths; +import io.zell.zdb.latest.VersionLatestTest; import io.zell.zdb.log.LogContentReader; import io.zell.zdb.log.LogSearch; import io.zell.zdb.log.LogStatus; @@ -42,7 +45,6 @@ import io.zell.zdb.state.incident.IncidentState; import io.zell.zdb.state.instance.InstanceState; import io.zell.zdb.state.process.ProcessState; -import io.zell.zdb.v82.Version82Test; import org.agrona.concurrent.UnsafeBuffer; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.AfterAll; @@ -61,11 +63,10 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Base64; -import java.util.HashMap; -import java.util.List; +import java.util.*; +import java.util.stream.StreamSupport; +import static io.zell.zdb.TestUtils.TIMESTAMP_REGEX; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -94,8 +95,8 @@ public class Version83Test { public class LargeLogTest { // earlier ZDB versions failed on large logs, because of segments were async created and incomplete private static final Logger LOGGER = - LoggerFactory.getLogger(LargeLogTest.class); - private static final File TEMP_DIR = TestUtils.newTmpFolder(LargeLogTest.class); + LoggerFactory.getLogger(VersionLatestTest.LargeLogTest.class); + private static final File TEMP_DIR = TestUtils.newTmpFolder(VersionLatestTest.LargeLogTest.class); private static final ZeebeContentCreator zeebeContentCreator = new ZeebeContentCreator(PROCESS); @Container public static ZeebeContainer zeebeContainer = new ZeebeContainer(DOCKER_IMAGE) @@ -132,9 +133,9 @@ public void shouldReadStatusFromLog() { final var status = logStatus.status(); // then - assertThat(status.getHighestIndex()).isEqualTo(211); + assertThat(status.getHighestIndex()).isEqualTo(213); assertThat(status.getHighestTerm()).isEqualTo(1); - assertThat(status.getHighestRecordPosition()).isEqualTo(258); + assertThat(status.getHighestRecordPosition()).isEqualTo(260); assertThat(status.getLowestIndex()).isEqualTo(1); assertThat(status.getLowestRecordPosition()).isEqualTo(1); @@ -146,13 +147,12 @@ public void shouldReadStatusFromLog() { .contains("lowestIndex"); } } - @Nested public class ZeebeLogTest { private static final Logger LOGGER = - LoggerFactory.getLogger(ZeebeLogTest.class); - private static final File TEMP_DIR = TestUtils.newTmpFolder(ZeebeLogTest.class); + LoggerFactory.getLogger(VersionLatestTest.ZeebeLogTest.class); + private static final File TEMP_DIR = TestUtils.newTmpFolder(VersionLatestTest.ZeebeLogTest.class); private static final ZeebeContentCreator zeebeContentCreator = new ZeebeContentCreator(PROCESS); @Container public static ZeebeContainer zeebeContainer = new ZeebeContainer(DOCKER_IMAGE) @@ -188,9 +188,9 @@ public void shouldReadStatusFromLog() { final var status = logStatus.status(); // then - assertThat(status.getHighestIndex()).isEqualTo(13); + assertThat(status.getHighestIndex()).isEqualTo(15); assertThat(status.getHighestTerm()).isEqualTo(1); - assertThat(status.getHighestRecordPosition()).isEqualTo(60); + assertThat(status.getHighestRecordPosition()).isEqualTo(62); assertThat(status.getLowestIndex()).isEqualTo(1); assertThat(status.getLowestRecordPosition()).isEqualTo(1); @@ -244,6 +244,104 @@ public void shouldReadLogContentWithIterator() { verifyCompleteLog(records); } + @Test + public void shouldReadRejection() { + // given + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + + // when + logContentReader.filterForRejections(); + + // then + final var rejection = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> record.component8() != RejectionType.NULL_VAL) + .findFirst(); + + assertThat(rejection).isPresent(); + assertThat(rejection.get().component8()).isEqualTo(RejectionType.NOT_FOUND); + assertThat(rejection.get().component9()).isEqualTo("Expected to find process definition with process ID 'nonExisting', but none found"); + } + + @Test + public void shouldSerializeRejectionToJson() throws JsonProcessingException { + // given + final var expectedJson = OBJECT_MAPPER.readTree(""" + {"position":62,"sourceRecordPosition":61,"key":-1,"recordType":"COMMAND_REJECTION", + "valueType":"PROCESS_INSTANCE_CREATION","intent":"CREATE","rejectionType":"NOT_FOUND", + "rejectionReason":"Expected to find process definition with process ID 'nonExisting', but none found", + "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":4,"brokerVersion":"8.3.0", + "recordVersion":1,"authData":"", + "recordValue":{"bpmnProcessId":"nonExisting","processDefinitionKey":0,"processInstanceKey":-1, + "version":-1,"variables":"gA==","fetchVariables":[], + "startInstructions":[],"tenantId":""}} + } +"""); + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + + // when + logContentReader.filterForRejections(); + + // then + final var rejection = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> !record.component8().equals(RejectionType.NULL_VAL.name())) + .findFirst(); + + assertThat(rejection).isPresent(); + assertThat(rejection.get().component8()).isEqualTo(RejectionType.NOT_FOUND); + assertThat(rejection.get().component9()).isEqualTo("Expected to find process definition with process ID 'nonExisting', but none found"); + + final var recordJson = rejection.get().toString() + .replaceFirst(TIMESTAMP_REGEX, ""); + final var actualJson = OBJECT_MAPPER.readTree(recordJson); + assertThat(actualJson).isNotNull(); // is valid json + assertThat(actualJson).isEqualTo(expectedJson); + } + + @Test + public void shouldSerializeRecordToJson() throws JsonProcessingException { + // given + final var expectedJson = OBJECT_MAPPER.readTree(""" + {"position":12,"sourceRecordPosition":5,"key":2251799813685252,"recordType":"EVENT", + "valueType":"PROCESS_INSTANCE","intent":"ELEMENT_ACTIVATED","requestId":-1, + "requestStreamId":-2147483648,"protocolVersion":4,"brokerVersion":"8.3.0","recordVersion":1, + "authData":"", + "recordValue":{"bpmnElementType":"PROCESS","elementId":"process","bpmnProcessId":"process", + "version":1,"processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685252, + "flowScopeKey":-1,"bpmnEventType":"UNSPECIFIED","parentProcessInstanceKey":-1, + "parentElementInstanceKey":-1,"tenantId":""}} +"""); + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + logContentReader.filterForProcessInstance(zeebeContentCreator.processInstanceEvent.getProcessInstanceKey()); + + // when + final var piActivated = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> record.component6() == ValueType.PROCESS_INSTANCE) + .filter(record -> record.component7() == ProcessInstanceIntent.ELEMENT_ACTIVATED) + .filter(record -> record.getPiRelatedValue() != null) + .filter(record -> record.getPiRelatedValue().getBpmnElementType() == BpmnElementType.PROCESS) + .findFirst(); + + // then + assertThat(piActivated).isPresent(); + final var recordJson = piActivated.get().toString() + .replaceFirst(TIMESTAMP_REGEX, ""); + final var actualJson = OBJECT_MAPPER.readTree(recordJson); + assertThat(actualJson).isNotNull(); // is valid json + assertThat(actualJson).isEqualTo(expectedJson); + } + @Test public void shouldSkipFirstPartOfLog() { // given @@ -256,13 +354,13 @@ public void shouldSkipFirstPartOfLog() { logContentReader.forEachRemaining(records::add); // then - assertThat(records).hasSize(9); + assertThat(records).hasSize(11); // we skip the first raft record assertThat(records.stream().filter(RaftRecord.class::isInstance).count()).isEqualTo(0); - assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(9); + assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(11); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); assertThat(minIndex).isEqualTo(5); @@ -272,7 +370,7 @@ public void shouldSkipFirstPartOfLog() { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) @@ -329,9 +427,9 @@ public void shouldSeekToEndOfLogIfNoExistingSeek() { assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(1); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); - assertThat(minIndex).isEqualTo(13); + assertThat(minIndex).isEqualTo(15); final var maxPosition = records.stream() .filter(ApplicationRecord.class::isInstance) @@ -339,14 +437,14 @@ public void shouldSeekToEndOfLogIfNoExistingSeek() { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) .map(ApplicationRecord::getLowestPosition) .min(Long::compareTo) .orElseThrow(); - assertThat(minPosition).isEqualTo(60); + assertThat(minPosition).isEqualTo(62); } @Test @@ -605,12 +703,12 @@ public void shouldFilterWithProcessInstanceKeyAndSetBeginAndEndOfLogPosition() { } private static void verifyCompleteLog(List records) { - assertThat(records).hasSize(13); + assertThat(records).hasSize(15); assertThat(records.stream().filter(RaftRecord.class::isInstance).count()).isEqualTo(1); - assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(12); + assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(14); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); assertThat(minIndex).isEqualTo(1); @@ -620,7 +718,7 @@ private static void verifyCompleteLog(List records) { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) @@ -671,7 +769,7 @@ public void shouldSearchPositionInLog() { final var position = 1; // when - final io.zell.zdb.log.records.Record record = logSearch.searchPosition(position); + final Record record = logSearch.searchPosition(position); // then assertThat(record).isNotNull(); @@ -685,7 +783,7 @@ public void shouldReturnNullOnNegPosition() { var logSearch = new LogSearch(logPath); // when - final io.zell.zdb.log.records.Record record = logSearch.searchPosition(-1); + final Record record = logSearch.searchPosition(-1); // then assertThat(record).isNull(); @@ -698,7 +796,7 @@ public void shouldReturnNullOnToBigPosition() { var logSearch = new LogSearch(logPath); // when - final io.zell.zdb.log.records.Record record = logSearch.searchPosition(Long.MAX_VALUE); + final Record record = logSearch.searchPosition(Long.MAX_VALUE); // then assertThat(record).isNull(); @@ -733,7 +831,7 @@ final var record = logSearch.searchIndex(index); assertThat(record) .asInstanceOf(InstanceOfAssertFactories.type(ApplicationRecord.class)) .extracting(ApplicationRecord::getEntries) - .asInstanceOf(InstanceOfAssertFactories.list(io.zell.zdb.log.records.Record.class)) + .asInstanceOf(InstanceOfAssertFactories.list(Record.class)) .extracting(Record::getPosition) .doesNotHaveDuplicates(); } From 0cb62068e8a77eb89f1314af755f6c797aebf97a Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Tue, 31 Oct 2023 22:13:53 +0100 Subject: [PATCH 04/11] test: adjust imports --- .../src/test/kotlin/io/zell/zdb/v83/Version83Test.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java b/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java index d33a79d6..3d152cbd 100644 --- a/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java +++ b/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java @@ -32,7 +32,6 @@ import io.zell.zdb.TestUtils; import io.zell.zdb.ZeebeContentCreator; import io.zell.zdb.ZeebePaths; -import io.zell.zdb.latest.VersionLatestTest; import io.zell.zdb.log.LogContentReader; import io.zell.zdb.log.LogSearch; import io.zell.zdb.log.LogStatus; @@ -95,8 +94,8 @@ public class Version83Test { public class LargeLogTest { // earlier ZDB versions failed on large logs, because of segments were async created and incomplete private static final Logger LOGGER = - LoggerFactory.getLogger(VersionLatestTest.LargeLogTest.class); - private static final File TEMP_DIR = TestUtils.newTmpFolder(VersionLatestTest.LargeLogTest.class); + LoggerFactory.getLogger(LargeLogTest.class); + private static final File TEMP_DIR = TestUtils.newTmpFolder(LargeLogTest.class); private static final ZeebeContentCreator zeebeContentCreator = new ZeebeContentCreator(PROCESS); @Container public static ZeebeContainer zeebeContainer = new ZeebeContainer(DOCKER_IMAGE) @@ -151,8 +150,8 @@ public void shouldReadStatusFromLog() { public class ZeebeLogTest { private static final Logger LOGGER = - LoggerFactory.getLogger(VersionLatestTest.ZeebeLogTest.class); - private static final File TEMP_DIR = TestUtils.newTmpFolder(VersionLatestTest.ZeebeLogTest.class); + LoggerFactory.getLogger(ZeebeLogTest.class); + private static final File TEMP_DIR = TestUtils.newTmpFolder(ZeebeLogTest.class); private static final ZeebeContentCreator zeebeContentCreator = new ZeebeContentCreator(PROCESS); @Container public static ZeebeContainer zeebeContainer = new ZeebeContainer(DOCKER_IMAGE) From 1cb3b9f55f55389dbd6ffc7065ac21a3961ac666 Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Tue, 31 Oct 2023 22:31:13 +0100 Subject: [PATCH 05/11] test: adjust/update 8.2 test --- .../kotlin/io/zell/zdb/v82/Version82Test.java | 150 +++++++++++++++--- 1 file changed, 124 insertions(+), 26 deletions(-) diff --git a/backend/src/test/kotlin/io/zell/zdb/v82/Version82Test.java b/backend/src/test/kotlin/io/zell/zdb/v82/Version82Test.java index fa43bec7..814a0a35 100644 --- a/backend/src/test/kotlin/io/zell/zdb/v82/Version82Test.java +++ b/backend/src/test/kotlin/io/zell/zdb/v82/Version82Test.java @@ -22,6 +22,8 @@ import io.camunda.zeebe.model.bpmn.Bpmn; import io.camunda.zeebe.model.bpmn.BpmnModelInstance; import io.camunda.zeebe.protocol.ZbColumnFamilies; +import io.camunda.zeebe.protocol.record.RejectionType; +import io.camunda.zeebe.protocol.record.ValueType; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; import io.camunda.zeebe.protocol.record.value.BpmnElementType; import io.camunda.zeebe.protocol.record.value.ErrorType; @@ -30,6 +32,7 @@ import io.zell.zdb.TestUtils; import io.zell.zdb.ZeebeContentCreator; import io.zell.zdb.ZeebePaths; +import io.zell.zdb.latest.VersionLatestTest; import io.zell.zdb.log.LogContentReader; import io.zell.zdb.log.LogSearch; import io.zell.zdb.log.LogStatus; @@ -60,11 +63,10 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Base64; -import java.util.HashMap; -import java.util.List; +import java.util.*; +import java.util.stream.StreamSupport; +import static io.zell.zdb.TestUtils.TIMESTAMP_REGEX; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -127,9 +129,9 @@ public void shouldReadStatusFromLog() { final var status = logStatus.status(); // then - assertThat(status.getHighestIndex()).isEqualTo(211); + assertThat(status.getHighestIndex()).isEqualTo(213); assertThat(status.getHighestTerm()).isEqualTo(1); - assertThat(status.getHighestRecordPosition()).isEqualTo(258); + assertThat(status.getHighestRecordPosition()).isEqualTo(260); assertThat(status.getLowestIndex()).isEqualTo(1); assertThat(status.getLowestRecordPosition()).isEqualTo(1); @@ -141,7 +143,6 @@ public void shouldReadStatusFromLog() { .contains("lowestIndex"); } } - @Nested public class ZeebeLogTest { @@ -162,7 +163,6 @@ public class ZeebeLogTest { static { TEMP_DIR.mkdirs(); } - @BeforeAll public static void setup() { zeebeContentCreator @@ -184,9 +184,9 @@ public void shouldReadStatusFromLog() { final var status = logStatus.status(); // then - assertThat(status.getHighestIndex()).isEqualTo(13); + assertThat(status.getHighestIndex()).isEqualTo(15); assertThat(status.getHighestTerm()).isEqualTo(1); - assertThat(status.getHighestRecordPosition()).isEqualTo(60); + assertThat(status.getHighestRecordPosition()).isEqualTo(62); assertThat(status.getLowestIndex()).isEqualTo(1); assertThat(status.getLowestRecordPosition()).isEqualTo(1); @@ -240,6 +240,104 @@ public void shouldReadLogContentWithIterator() { verifyCompleteLog(records); } + @Test + public void shouldReadRejection() { + // given + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + + // when + logContentReader.filterForRejections(); + + // then + final var rejection = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> record.component8() != RejectionType.NULL_VAL) + .findFirst(); + + assertThat(rejection).isPresent(); + assertThat(rejection.get().component8()).isEqualTo(RejectionType.INVALID_ARGUMENT); + assertThat(rejection.get().component9()).isEqualTo("Expected to find process definition with process ID 'nonExisting', but none found"); + } + + @Test + public void shouldSerializeRejectionToJson() throws JsonProcessingException { + // given + final var expectedJson = OBJECT_MAPPER.readTree(""" + {"position":62,"sourceRecordPosition":61,"key":-1,"recordType":"COMMAND_REJECTION", + "valueType":"PROCESS_INSTANCE_CREATION","intent":"CREATE","rejectionType":"INVALID_ARGUMENT", + "rejectionReason":"Expected to find process definition with process ID 'nonExisting', but none found", + "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":3,"brokerVersion":"2049.512.4096", + "recordVersion":20736,"authData":"", + "recordValue":{"bpmnProcessId":"nonExisting","processDefinitionKey":0,"processInstanceKey":-1, + "version":-1,"variables":"gA==","fetchVariables":[], + "startInstructions":[]}} + } +"""); + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + + // when + logContentReader.filterForRejections(); + + // then + final var rejection = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> !record.component8().equals(RejectionType.NULL_VAL.name())) + .findFirst(); + + assertThat(rejection).isPresent(); + assertThat(rejection.get().component8()).isEqualTo(RejectionType.INVALID_ARGUMENT); + assertThat(rejection.get().component9()).isEqualTo("Expected to find process definition with process ID 'nonExisting', but none found"); + + final var recordJson = rejection.get().toString() + .replaceFirst(TIMESTAMP_REGEX, ""); + final var actualJson = OBJECT_MAPPER.readTree(recordJson); + assertThat(actualJson).isNotNull(); // is valid json + assertThat(actualJson).isEqualTo(expectedJson); + } + + @Test + public void shouldSerializeRecordToJson() throws JsonProcessingException { + // given + final var expectedJson = OBJECT_MAPPER.readTree(""" + {"position":13,"sourceRecordPosition":6,"key":2251799813685252,"recordType":"EVENT", + "valueType":"PROCESS_INSTANCE","intent":"ELEMENT_ACTIVATED","rejectionType":"INVALID_ARGUMENT", + "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":3,"brokerVersion":"2303.512.4096", + "recordVersion":1,"authData":"", + "recordValue":{"bpmnElementType":"PROCESS","elementId":"process","bpmnProcessId":"process", + "version":1,"processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685252, + "flowScopeKey":-1,"bpmnEventType":"UNSPECIFIED","parentProcessInstanceKey":-1, + "parentElementInstanceKey":-1}} +"""); + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + logContentReader.filterForProcessInstance(zeebeContentCreator.processInstanceEvent.getProcessInstanceKey()); + + // when + final var piActivated = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> record.component6() == ValueType.PROCESS_INSTANCE) + .filter(record -> record.component7() == ProcessInstanceIntent.ELEMENT_ACTIVATED) + .filter(record -> record.getPiRelatedValue() != null) + .filter(record -> record.getPiRelatedValue().getBpmnElementType() == BpmnElementType.PROCESS) + .findFirst(); + + // then + assertThat(piActivated).isPresent(); + final var recordJson = piActivated.get().toString() + .replaceFirst(TIMESTAMP_REGEX, ""); + final var actualJson = OBJECT_MAPPER.readTree(recordJson); + assertThat(actualJson).isNotNull(); // is valid json + assertThat(actualJson).isEqualTo(expectedJson); + } + @Test public void shouldSkipFirstPartOfLog() { // given @@ -252,13 +350,13 @@ public void shouldSkipFirstPartOfLog() { logContentReader.forEachRemaining(records::add); // then - assertThat(records).hasSize(9); + assertThat(records).hasSize(11); // we skip the first raft record assertThat(records.stream().filter(RaftRecord.class::isInstance).count()).isEqualTo(0); - assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(9); + assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(11); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); assertThat(minIndex).isEqualTo(5); @@ -268,7 +366,7 @@ public void shouldSkipFirstPartOfLog() { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) @@ -325,9 +423,9 @@ public void shouldSeekToEndOfLogIfNoExistingSeek() { assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(1); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); - assertThat(minIndex).isEqualTo(13); + assertThat(minIndex).isEqualTo(15); final var maxPosition = records.stream() .filter(ApplicationRecord.class::isInstance) @@ -335,14 +433,14 @@ public void shouldSeekToEndOfLogIfNoExistingSeek() { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) .map(ApplicationRecord::getLowestPosition) .min(Long::compareTo) .orElseThrow(); - assertThat(minPosition).isEqualTo(60); + assertThat(minPosition).isEqualTo(62); } @Test @@ -601,12 +699,12 @@ public void shouldFilterWithProcessInstanceKeyAndSetBeginAndEndOfLogPosition() { } private static void verifyCompleteLog(List records) { - assertThat(records).hasSize(13); + assertThat(records).hasSize(15); assertThat(records.stream().filter(RaftRecord.class::isInstance).count()).isEqualTo(1); - assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(12); + assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(14); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); assertThat(minIndex).isEqualTo(1); @@ -616,7 +714,7 @@ private static void verifyCompleteLog(List records) { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) @@ -667,7 +765,7 @@ public void shouldSearchPositionInLog() { final var position = 1; // when - final io.zell.zdb.log.records.Record record = logSearch.searchPosition(position); + final Record record = logSearch.searchPosition(position); // then assertThat(record).isNotNull(); @@ -681,7 +779,7 @@ public void shouldReturnNullOnNegPosition() { var logSearch = new LogSearch(logPath); // when - final io.zell.zdb.log.records.Record record = logSearch.searchPosition(-1); + final Record record = logSearch.searchPosition(-1); // then assertThat(record).isNull(); @@ -694,7 +792,7 @@ public void shouldReturnNullOnToBigPosition() { var logSearch = new LogSearch(logPath); // when - final io.zell.zdb.log.records.Record record = logSearch.searchPosition(Long.MAX_VALUE); + final Record record = logSearch.searchPosition(Long.MAX_VALUE); // then assertThat(record).isNull(); @@ -729,7 +827,7 @@ final var record = logSearch.searchIndex(index); assertThat(record) .asInstanceOf(InstanceOfAssertFactories.type(ApplicationRecord.class)) .extracting(ApplicationRecord::getEntries) - .asInstanceOf(InstanceOfAssertFactories.list(io.zell.zdb.log.records.Record.class)) + .asInstanceOf(InstanceOfAssertFactories.list(Record.class)) .extracting(Record::getPosition) .doesNotHaveDuplicates(); } From 98579daf489a75124800434ee2918144afd967ec Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Sat, 4 Nov 2023 21:14:11 +0100 Subject: [PATCH 06/11] fix: add old RecordMetadata classes Old metadata in order to read BrokerVersion and RejectionType before 8.3 related to https://github.com/Zelldon/zdb/issues/321 --- .../records/old/RecordMetadataBefore83.java | 299 ++++++++ .../old/RecordMetadataDecoderBefore83.java | 696 ++++++++++++++++++ .../old/RecordMetadataEncoderBefore83.java | 562 ++++++++++++++ .../kotlin/io/zell/zdb/v81/Version81Test.java | 137 +++- 4 files changed, 1671 insertions(+), 23 deletions(-) create mode 100644 backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataBefore83.java create mode 100644 backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataDecoderBefore83.java create mode 100644 backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataEncoderBefore83.java diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataBefore83.java b/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataBefore83.java new file mode 100644 index 00000000..473b92a1 --- /dev/null +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataBefore83.java @@ -0,0 +1,299 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Zeebe Community License 1.1. You may not use this file + * except in compliance with the Zeebe Community License 1.1. + */ +package io.zell.zdb.log.records.old; + +import io.camunda.zeebe.protocol.Protocol; +import io.camunda.zeebe.protocol.impl.record.VersionInfo; +import io.camunda.zeebe.protocol.record.*; +import io.camunda.zeebe.protocol.record.intent.Intent; +import io.camunda.zeebe.util.VersionUtil; +import io.camunda.zeebe.util.buffer.BufferReader; +import io.camunda.zeebe.util.buffer.BufferUtil; +import io.camunda.zeebe.util.buffer.BufferWriter; +import org.agrona.DirectBuffer; +import org.agrona.MutableDirectBuffer; +import org.agrona.concurrent.UnsafeBuffer; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.Optional; + +public final class RecordMetadataBefore83 implements BufferWriter, BufferReader { + public static final int BLOCK_LENGTH = + MessageHeaderEncoder.ENCODED_LENGTH + RecordMetadataEncoderBefore83.BLOCK_LENGTH; + + private static final VersionInfo CURRENT_BROKER_VERSION = + VersionInfo.parse(VersionUtil.getVersion()); + + private final MessageHeaderEncoder headerEncoder = new MessageHeaderEncoder(); + private final MessageHeaderDecoder headerDecoder = new MessageHeaderDecoder(); + private final RecordMetadataEncoderBefore83 encoder = new RecordMetadataEncoderBefore83(); + private final RecordMetadataDecoderBefore83 decoder = new RecordMetadataDecoderBefore83(); + + private RecordType recordType = RecordType.NULL_VAL; + private ValueType valueType = ValueType.NULL_VAL; + private Intent intent = null; + private long requestId; + private short intentValue = Intent.NULL_VAL; + private int requestStreamId; + private RejectionType rejectionType; + private final UnsafeBuffer rejectionReason = new UnsafeBuffer(0, 0); + + // always the current version by default + private int protocolVersion = 3; + private VersionInfo brokerVersion = CURRENT_BROKER_VERSION; + + public RecordMetadataBefore83() { + reset(); + } + + @Override + public void wrap(final DirectBuffer buffer, int offset, final int length) { + reset(); + + headerDecoder.wrap(buffer, offset); + + offset += headerDecoder.encodedLength(); + + decoder.wrap(buffer, offset, headerDecoder.blockLength(), headerDecoder.version()); + + recordType = decoder.recordType(); + requestStreamId = decoder.requestStreamId(); + requestId = decoder.requestId(); + protocolVersion = decoder.protocolVersion(); + valueType = decoder.valueType(); + intent = Intent.fromProtocolValue(valueType, decoder.intent()); + rejectionType = decoder.rejectionType(); + + brokerVersion = + Optional.ofNullable(decoder.brokerVersion()) + .map( + versionDecoder -> + new VersionInfo( + versionDecoder.majorVersion(), + versionDecoder.minorVersion(), + versionDecoder.patchVersion())) + .orElse(VersionInfo.UNKNOWN); + + final int rejectionReasonLength = decoder.rejectionReasonLength(); + + if (rejectionReasonLength > 0) { + offset += headerDecoder.blockLength(); + offset += RecordMetadataDecoderBefore83.rejectionReasonHeaderLength(); + + rejectionReason.wrap(buffer, offset, rejectionReasonLength); + } + } + + @Override + public int getLength() { + return BLOCK_LENGTH + + RecordMetadataEncoderBefore83.rejectionReasonHeaderLength() + + rejectionReason.capacity(); + } + + @Override + public void write(final MutableDirectBuffer buffer, int offset) { + headerEncoder.wrap(buffer, offset); + + headerEncoder + .blockLength(encoder.sbeBlockLength()) + .templateId(encoder.sbeTemplateId()) + .schemaId(encoder.sbeSchemaId()) + .version(encoder.sbeSchemaVersion()); + + offset += headerEncoder.encodedLength(); + + encoder.wrap(buffer, offset); + + encoder + .recordType(recordType) + .requestStreamId(requestStreamId) + .requestId(requestId) + .protocolVersion(protocolVersion) + .valueType(valueType) + .intent(intentValue) + .rejectionType(rejectionType); + + encoder + .brokerVersion() + .majorVersion(brokerVersion.getMajorVersion()) + .minorVersion(brokerVersion.getMinorVersion()) + .patchVersion(brokerVersion.getPatchVersion()); + + offset += RecordMetadataEncoderBefore83.BLOCK_LENGTH; + + if (rejectionReason.capacity() > 0) { + encoder.putRejectionReason(rejectionReason, 0, rejectionReason.capacity()); + } else { + buffer.putInt(offset, 0); + } + } + + public long getRequestId() { + return requestId; + } + + public RecordMetadataBefore83 requestId(final long requestId) { + this.requestId = requestId; + return this; + } + + public int getRequestStreamId() { + return requestStreamId; + } + + public RecordMetadataBefore83 requestStreamId(final int requestStreamId) { + this.requestStreamId = requestStreamId; + return this; + } + + public RecordMetadataBefore83 protocolVersion(final int protocolVersion) { + this.protocolVersion = protocolVersion; + return this; + } + + public int getProtocolVersion() { + return protocolVersion; + } + + public ValueType getValueType() { + return valueType; + } + + public RecordMetadataBefore83 valueType(final ValueType eventType) { + valueType = eventType; + return this; + } + + public RecordMetadataBefore83 intent(final Intent intent) { + this.intent = intent; + intentValue = intent.value(); + return this; + } + + public Intent getIntent() { + return intent; + } + + public RecordMetadataBefore83 recordType(final RecordType recordType) { + this.recordType = recordType; + return this; + } + + public RecordType getRecordType() { + return recordType; + } + + public RecordMetadataBefore83 rejectionType(final RejectionType rejectionType) { + this.rejectionType = rejectionType; + return this; + } + + public RejectionType getRejectionType() { + return rejectionType; + } + + public RecordMetadataBefore83 rejectionReason(final String rejectionReason) { + final byte[] bytes = rejectionReason.getBytes(StandardCharsets.UTF_8); + this.rejectionReason.wrap(bytes); + return this; + } + + public RecordMetadataBefore83 rejectionReason(final DirectBuffer buffer) { + rejectionReason.wrap(buffer); + return this; + } + + public String getRejectionReason() { + return BufferUtil.bufferAsString(rejectionReason); + } + + public RecordMetadataBefore83 brokerVersion(final VersionInfo brokerVersion) { + this.brokerVersion = brokerVersion; + return this; + } + + public VersionInfo getBrokerVersion() { + return brokerVersion; + } + + public RecordMetadataBefore83 reset() { + recordType = RecordType.NULL_VAL; + requestId = RecordMetadataEncoderBefore83.requestIdNullValue(); + requestStreamId = RecordMetadataEncoderBefore83.requestStreamIdNullValue(); + protocolVersion = Protocol.PROTOCOL_VERSION; + valueType = ValueType.NULL_VAL; + intentValue = Intent.NULL_VAL; + intent = null; + rejectionType = RejectionType.NULL_VAL; + rejectionReason.wrap(0, 0); + brokerVersion = CURRENT_BROKER_VERSION; + return this; + } + + @Override + public int hashCode() { + return Objects.hash( + requestId, + valueType, + recordType, + intentValue, + requestStreamId, + rejectionType, + rejectionReason, + protocolVersion, + brokerVersion); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final RecordMetadataBefore83 that = (RecordMetadataBefore83) o; + return requestId == that.requestId + && intentValue == that.intentValue + && requestStreamId == that.requestStreamId + && protocolVersion == that.protocolVersion + && valueType == that.valueType + && recordType == that.recordType + && rejectionType == that.rejectionType + && rejectionReason.equals(that.rejectionReason) + && brokerVersion.equals(that.brokerVersion); + } + + @Override + public String toString() { + // The toString is intentionally cut-down to the only important properties for debugging + // (mostly for tests). + // If the record is already written to the log (in production) we have other ways to make + // it readable again. + final var builder = + new StringBuilder( + "RecordMetadata{" + + "recordType=" + + recordType + + ", valueType=" + + valueType + + ", intent=" + + intent); + if (!rejectionType.equals(RejectionType.NULL_VAL)) { + builder.append(", rejectionType=").append(rejectionType); + } + if (rejectionReason.capacity() > 0) { + builder.append(", rejectionReason=").append(BufferUtil.bufferAsString(rejectionReason)); + } + + builder.append('}'); + return builder.toString(); + } +} diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataDecoderBefore83.java b/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataDecoderBefore83.java new file mode 100644 index 00000000..282e76d5 --- /dev/null +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataDecoderBefore83.java @@ -0,0 +1,696 @@ +/* Generated SBE (Simple Binary Encoding) message codec. */ +package io.zell.zdb.log.records.old; + +import io.camunda.zeebe.protocol.record.*; +import org.agrona.MutableDirectBuffer; +import org.agrona.DirectBuffer; +import org.agrona.sbe.*; + + +/** + * Descriptor for Record Metadata + */ +@SuppressWarnings("all") +public final class RecordMetadataDecoderBefore83 implements MessageDecoderFlyweight +{ + public static final int BLOCK_LENGTH = 30; + public static final int TEMPLATE_ID = 200; + public static final int SCHEMA_ID = 0; + public static final int SCHEMA_VERSION = 3; + public static final String SEMANTIC_VERSION = "8.2.15"; + public static final java.nio.ByteOrder BYTE_ORDER = java.nio.ByteOrder.LITTLE_ENDIAN; + + private final RecordMetadataDecoderBefore83 parentMessage = this; + private DirectBuffer buffer; + private int initialOffset; + private int offset; + private int limit; + int actingBlockLength; + int actingVersion; + + public int sbeBlockLength() + { + return BLOCK_LENGTH; + } + + public int sbeTemplateId() + { + return TEMPLATE_ID; + } + + public int sbeSchemaId() + { + return SCHEMA_ID; + } + + public int sbeSchemaVersion() + { + return SCHEMA_VERSION; + } + + public String sbeSemanticType() + { + return ""; + } + + public DirectBuffer buffer() + { + return buffer; + } + + public int initialOffset() + { + return initialOffset; + } + + public int offset() + { + return offset; + } + + public RecordMetadataDecoderBefore83 wrap( + final DirectBuffer buffer, + final int offset, + final int actingBlockLength, + final int actingVersion) + { + if (buffer != this.buffer) + { + this.buffer = buffer; + } + this.initialOffset = offset; + this.offset = offset; + this.actingBlockLength = actingBlockLength; + this.actingVersion = actingVersion; + limit(offset + actingBlockLength); + + return this; + } + + public RecordMetadataDecoderBefore83 wrapAndApplyHeader( + final DirectBuffer buffer, + final int offset, + final MessageHeaderDecoder headerDecoder) + { + headerDecoder.wrap(buffer, offset); + + final int templateId = headerDecoder.templateId(); + if (TEMPLATE_ID != templateId) + { + throw new IllegalStateException("Invalid TEMPLATE_ID: " + templateId); + } + + return wrap( + buffer, + offset + MessageHeaderDecoder.ENCODED_LENGTH, + headerDecoder.blockLength(), + headerDecoder.version()); + } + + public RecordMetadataDecoderBefore83 sbeRewind() + { + return wrap(buffer, initialOffset, actingBlockLength, actingVersion); + } + + public int sbeDecodedLength() + { + final int currentLimit = limit(); + sbeSkip(); + final int decodedLength = encodedLength(); + limit(currentLimit); + + return decodedLength; + } + + public int encodedLength() + { + return limit - offset; + } + + public int limit() + { + return limit; + } + + public void limit(final int limit) + { + this.limit = limit; + } + + public static int recordTypeId() + { + return 1; + } + + public static int recordTypeSinceVersion() + { + return 0; + } + + public static int recordTypeEncodingOffset() + { + return 0; + } + + public static int recordTypeEncodingLength() + { + return 1; + } + + public static String recordTypeMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public short recordTypeRaw() + { + return ((short)(buffer.getByte(offset + 0) & 0xFF)); + } + + public RecordType recordType() + { + return RecordType.get(((short)(buffer.getByte(offset + 0) & 0xFF))); + } + + + public static int requestStreamIdId() + { + return 2; + } + + public static int requestStreamIdSinceVersion() + { + return 0; + } + + public static int requestStreamIdEncodingOffset() + { + return 1; + } + + public static int requestStreamIdEncodingLength() + { + return 4; + } + + public static String requestStreamIdMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public static int requestStreamIdNullValue() + { + return -2147483648; + } + + public static int requestStreamIdMinValue() + { + return -2147483647; + } + + public static int requestStreamIdMaxValue() + { + return 2147483647; + } + + public int requestStreamId() + { + return buffer.getInt(offset + 1, java.nio.ByteOrder.LITTLE_ENDIAN); + } + + + public static int requestIdId() + { + return 3; + } + + public static int requestIdSinceVersion() + { + return 0; + } + + public static int requestIdEncodingOffset() + { + return 5; + } + + public static int requestIdEncodingLength() + { + return 8; + } + + public static String requestIdMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public static long requestIdNullValue() + { + return 0xffffffffffffffffL; + } + + public static long requestIdMinValue() + { + return 0x0L; + } + + public static long requestIdMaxValue() + { + return 0xfffffffffffffffeL; + } + + public long requestId() + { + return buffer.getLong(offset + 5, java.nio.ByteOrder.LITTLE_ENDIAN); + } + + + public static int protocolVersionId() + { + return 4; + } + + public static int protocolVersionSinceVersion() + { + return 0; + } + + public static int protocolVersionEncodingOffset() + { + return 13; + } + + public static int protocolVersionEncodingLength() + { + return 2; + } + + public static String protocolVersionMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public static int protocolVersionNullValue() + { + return 65535; + } + + public static int protocolVersionMinValue() + { + return 0; + } + + public static int protocolVersionMaxValue() + { + return 65534; + } + + public int protocolVersion() + { + return (buffer.getShort(offset + 13, java.nio.ByteOrder.LITTLE_ENDIAN) & 0xFFFF); + } + + + public static int valueTypeId() + { + return 5; + } + + public static int valueTypeSinceVersion() + { + return 0; + } + + public static int valueTypeEncodingOffset() + { + return 15; + } + + public static int valueTypeEncodingLength() + { + return 1; + } + + public static String valueTypeMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public short valueTypeRaw() + { + return ((short)(buffer.getByte(offset + 15) & 0xFF)); + } + + public ValueType valueType() + { + return ValueType.get(((short)(buffer.getByte(offset + 15) & 0xFF))); + } + + + public static int intentId() + { + return 6; + } + + public static int intentSinceVersion() + { + return 0; + } + + public static int intentEncodingOffset() + { + return 16; + } + + public static int intentEncodingLength() + { + return 1; + } + + public static String intentMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public static short intentNullValue() + { + return (short)255; + } + + public static short intentMinValue() + { + return (short)0; + } + + public static short intentMaxValue() + { + return (short)254; + } + + public short intent() + { + return ((short)(buffer.getByte(offset + 16) & 0xFF)); + } + + + public static int rejectionTypeId() + { + return 7; + } + + public static int rejectionTypeSinceVersion() + { + return 0; + } + + public static int rejectionTypeEncodingOffset() + { + return 17; + } + + public static int rejectionTypeEncodingLength() + { + return 1; + } + + public static String rejectionTypeMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public short rejectionTypeRaw() + { + return ((short)(buffer.getByte(offset + 17) & 0xFF)); + } + + public RejectionType rejectionType() + { + return RejectionType.get(((short)(buffer.getByte(offset + 17) & 0xFF))); + } + + + public static int brokerVersionId() + { + return 9; + } + + public static int brokerVersionSinceVersion() + { + return 2; + } + + public static int brokerVersionEncodingOffset() + { + return 18; + } + + public static int brokerVersionEncodingLength() + { + return 12; + } + + public static String brokerVersionMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "optional"; + } + + return ""; + } + + private final VersionDecoder brokerVersion = new VersionDecoder(); + + public VersionDecoder brokerVersion() + { + if (parentMessage.actingVersion < 2) + { + return null; + } + + brokerVersion.wrap(buffer, offset + 18); + return brokerVersion; + } + + public static int rejectionReasonId() + { + return 8; + } + + public static int rejectionReasonSinceVersion() + { + return 0; + } + + public static String rejectionReasonCharacterEncoding() + { + return java.nio.charset.StandardCharsets.UTF_8.name(); + } + + public static String rejectionReasonMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public static int rejectionReasonHeaderLength() + { + return 4; + } + + public int rejectionReasonLength() + { + final int limit = parentMessage.limit(); + return (int)(buffer.getInt(limit, java.nio.ByteOrder.LITTLE_ENDIAN) & 0xFFFF_FFFFL); + } + + public int skipRejectionReason() + { + final int headerLength = 4; + final int limit = parentMessage.limit(); + final int dataLength = (int)(buffer.getInt(limit, java.nio.ByteOrder.LITTLE_ENDIAN) & 0xFFFF_FFFFL); + final int dataOffset = limit + headerLength; + parentMessage.limit(dataOffset + dataLength); + + return dataLength; + } + + public int getRejectionReason(final MutableDirectBuffer dst, final int dstOffset, final int length) + { + final int headerLength = 4; + final int limit = parentMessage.limit(); + final int dataLength = (int)(buffer.getInt(limit, java.nio.ByteOrder.LITTLE_ENDIAN) & 0xFFFF_FFFFL); + final int bytesCopied = Math.min(length, dataLength); + parentMessage.limit(limit + headerLength + dataLength); + buffer.getBytes(limit + headerLength, dst, dstOffset, bytesCopied); + + return bytesCopied; + } + + public int getRejectionReason(final byte[] dst, final int dstOffset, final int length) + { + final int headerLength = 4; + final int limit = parentMessage.limit(); + final int dataLength = (int)(buffer.getInt(limit, java.nio.ByteOrder.LITTLE_ENDIAN) & 0xFFFF_FFFFL); + final int bytesCopied = Math.min(length, dataLength); + parentMessage.limit(limit + headerLength + dataLength); + buffer.getBytes(limit + headerLength, dst, dstOffset, bytesCopied); + + return bytesCopied; + } + + public void wrapRejectionReason(final DirectBuffer wrapBuffer) + { + final int headerLength = 4; + final int limit = parentMessage.limit(); + final int dataLength = (int)(buffer.getInt(limit, java.nio.ByteOrder.LITTLE_ENDIAN) & 0xFFFF_FFFFL); + parentMessage.limit(limit + headerLength + dataLength); + wrapBuffer.wrap(buffer, limit + headerLength, dataLength); + } + + public String rejectionReason() + { + final int headerLength = 4; + final int limit = parentMessage.limit(); + final int dataLength = (int)(buffer.getInt(limit, java.nio.ByteOrder.LITTLE_ENDIAN) & 0xFFFF_FFFFL); + parentMessage.limit(limit + headerLength + dataLength); + + if (0 == dataLength) + { + return ""; + } + + final byte[] tmp = new byte[dataLength]; + buffer.getBytes(limit + headerLength, tmp, 0, dataLength); + + return new String(tmp, java.nio.charset.StandardCharsets.UTF_8); + } + + public String toString() + { + if (null == buffer) + { + return ""; + } + + final RecordMetadataDecoderBefore83 decoder = new RecordMetadataDecoderBefore83(); + decoder.wrap(buffer, initialOffset, actingBlockLength, actingVersion); + + return decoder.appendTo(new StringBuilder()).toString(); + } + + public StringBuilder appendTo(final StringBuilder builder) + { + if (null == buffer) + { + return builder; + } + + final int originalLimit = limit(); + limit(initialOffset + actingBlockLength); + builder.append("[RecordMetadata](sbeTemplateId="); + builder.append(TEMPLATE_ID); + builder.append("|sbeSchemaId="); + builder.append(SCHEMA_ID); + builder.append("|sbeSchemaVersion="); + if (parentMessage.actingVersion != SCHEMA_VERSION) + { + builder.append(parentMessage.actingVersion); + builder.append('/'); + } + builder.append(SCHEMA_VERSION); + builder.append("|sbeBlockLength="); + if (actingBlockLength != BLOCK_LENGTH) + { + builder.append(actingBlockLength); + builder.append('/'); + } + builder.append(BLOCK_LENGTH); + builder.append("):"); + builder.append("recordType="); + builder.append(this.recordType()); + builder.append('|'); + builder.append("requestStreamId="); + builder.append(this.requestStreamId()); + builder.append('|'); + builder.append("requestId="); + builder.append(this.requestId()); + builder.append('|'); + builder.append("protocolVersion="); + builder.append(this.protocolVersion()); + builder.append('|'); + builder.append("valueType="); + builder.append(this.valueType()); + builder.append('|'); + builder.append("intent="); + builder.append(this.intent()); + builder.append('|'); + builder.append("rejectionType="); + builder.append(this.rejectionType()); + builder.append('|'); + builder.append("brokerVersion="); + final VersionDecoder brokerVersion = this.brokerVersion(); + if (brokerVersion != null) + { + brokerVersion.appendTo(builder); + } + else + { + builder.append("null"); + } + builder.append('|'); + builder.append("rejectionReason="); + builder.append('\'').append(rejectionReason()).append('\''); + + limit(originalLimit); + + return builder; + } + + public RecordMetadataDecoderBefore83 sbeSkip() + { + sbeRewind(); + skipRejectionReason(); + + return this; + } +} diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataEncoderBefore83.java b/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataEncoderBefore83.java new file mode 100644 index 00000000..58baebd1 --- /dev/null +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataEncoderBefore83.java @@ -0,0 +1,562 @@ +/* Generated SBE (Simple Binary Encoding) message codec. */ +package io.zell.zdb.log.records.old; + +import io.camunda.zeebe.protocol.record.*; +import org.agrona.MutableDirectBuffer; +import org.agrona.DirectBuffer; +import org.agrona.sbe.*; + + +/** + * Descriptor for Record Metadata + */ +@SuppressWarnings("all") +public final class RecordMetadataEncoderBefore83 implements MessageEncoderFlyweight +{ + public static final int BLOCK_LENGTH = 30; + public static final int TEMPLATE_ID = 200; + public static final int SCHEMA_ID = 0; + public static final int SCHEMA_VERSION = 3; + public static final String SEMANTIC_VERSION = "8.2.15"; + public static final java.nio.ByteOrder BYTE_ORDER = java.nio.ByteOrder.LITTLE_ENDIAN; + + private final RecordMetadataEncoderBefore83 parentMessage = this; + private MutableDirectBuffer buffer; + private int initialOffset; + private int offset; + private int limit; + + public int sbeBlockLength() + { + return BLOCK_LENGTH; + } + + public int sbeTemplateId() + { + return TEMPLATE_ID; + } + + public int sbeSchemaId() + { + return SCHEMA_ID; + } + + public int sbeSchemaVersion() + { + return SCHEMA_VERSION; + } + + public String sbeSemanticType() + { + return ""; + } + + public MutableDirectBuffer buffer() + { + return buffer; + } + + public int initialOffset() + { + return initialOffset; + } + + public int offset() + { + return offset; + } + + public RecordMetadataEncoderBefore83 wrap(final MutableDirectBuffer buffer, final int offset) + { + if (buffer != this.buffer) + { + this.buffer = buffer; + } + this.initialOffset = offset; + this.offset = offset; + limit(offset + BLOCK_LENGTH); + + return this; + } + + public RecordMetadataEncoderBefore83 wrapAndApplyHeader( + final MutableDirectBuffer buffer, final int offset, final MessageHeaderEncoder headerEncoder) + { + headerEncoder + .wrap(buffer, offset) + .blockLength(BLOCK_LENGTH) + .templateId(TEMPLATE_ID) + .schemaId(SCHEMA_ID) + .version(SCHEMA_VERSION); + + return wrap(buffer, offset + MessageHeaderEncoder.ENCODED_LENGTH); + } + + public int encodedLength() + { + return limit - offset; + } + + public int limit() + { + return limit; + } + + public void limit(final int limit) + { + this.limit = limit; + } + + public static int recordTypeId() + { + return 1; + } + + public static int recordTypeSinceVersion() + { + return 0; + } + + public static int recordTypeEncodingOffset() + { + return 0; + } + + public static int recordTypeEncodingLength() + { + return 1; + } + + public static String recordTypeMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public RecordMetadataEncoderBefore83 recordType(final RecordType value) + { + buffer.putByte(offset + 0, (byte)value.value()); + return this; + } + + public static int requestStreamIdId() + { + return 2; + } + + public static int requestStreamIdSinceVersion() + { + return 0; + } + + public static int requestStreamIdEncodingOffset() + { + return 1; + } + + public static int requestStreamIdEncodingLength() + { + return 4; + } + + public static String requestStreamIdMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public static int requestStreamIdNullValue() + { + return -2147483648; + } + + public static int requestStreamIdMinValue() + { + return -2147483647; + } + + public static int requestStreamIdMaxValue() + { + return 2147483647; + } + + public RecordMetadataEncoderBefore83 requestStreamId(final int value) + { + buffer.putInt(offset + 1, value, java.nio.ByteOrder.LITTLE_ENDIAN); + return this; + } + + + public static int requestIdId() + { + return 3; + } + + public static int requestIdSinceVersion() + { + return 0; + } + + public static int requestIdEncodingOffset() + { + return 5; + } + + public static int requestIdEncodingLength() + { + return 8; + } + + public static String requestIdMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public static long requestIdNullValue() + { + return 0xffffffffffffffffL; + } + + public static long requestIdMinValue() + { + return 0x0L; + } + + public static long requestIdMaxValue() + { + return 0xfffffffffffffffeL; + } + + public RecordMetadataEncoderBefore83 requestId(final long value) + { + buffer.putLong(offset + 5, value, java.nio.ByteOrder.LITTLE_ENDIAN); + return this; + } + + + public static int protocolVersionId() + { + return 4; + } + + public static int protocolVersionSinceVersion() + { + return 0; + } + + public static int protocolVersionEncodingOffset() + { + return 13; + } + + public static int protocolVersionEncodingLength() + { + return 2; + } + + public static String protocolVersionMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public static int protocolVersionNullValue() + { + return 65535; + } + + public static int protocolVersionMinValue() + { + return 0; + } + + public static int protocolVersionMaxValue() + { + return 65534; + } + + public RecordMetadataEncoderBefore83 protocolVersion(final int value) + { + buffer.putShort(offset + 13, (short)value, java.nio.ByteOrder.LITTLE_ENDIAN); + return this; + } + + + public static int valueTypeId() + { + return 5; + } + + public static int valueTypeSinceVersion() + { + return 0; + } + + public static int valueTypeEncodingOffset() + { + return 15; + } + + public static int valueTypeEncodingLength() + { + return 1; + } + + public static String valueTypeMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public RecordMetadataEncoderBefore83 valueType(final ValueType value) + { + buffer.putByte(offset + 15, (byte)value.value()); + return this; + } + + public static int intentId() + { + return 6; + } + + public static int intentSinceVersion() + { + return 0; + } + + public static int intentEncodingOffset() + { + return 16; + } + + public static int intentEncodingLength() + { + return 1; + } + + public static String intentMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public static short intentNullValue() + { + return (short)255; + } + + public static short intentMinValue() + { + return (short)0; + } + + public static short intentMaxValue() + { + return (short)254; + } + + public RecordMetadataEncoderBefore83 intent(final short value) + { + buffer.putByte(offset + 16, (byte)value); + return this; + } + + + public static int rejectionTypeId() + { + return 7; + } + + public static int rejectionTypeSinceVersion() + { + return 0; + } + + public static int rejectionTypeEncodingOffset() + { + return 17; + } + + public static int rejectionTypeEncodingLength() + { + return 1; + } + + public static String rejectionTypeMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public RecordMetadataEncoderBefore83 rejectionType(final RejectionType value) + { + buffer.putByte(offset + 17, (byte)value.value()); + return this; + } + + public static int brokerVersionId() + { + return 9; + } + + public static int brokerVersionSinceVersion() + { + return 2; + } + + public static int brokerVersionEncodingOffset() + { + return 18; + } + + public static int brokerVersionEncodingLength() + { + return 12; + } + + public static String brokerVersionMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "optional"; + } + + return ""; + } + + private final VersionEncoder brokerVersion = new VersionEncoder(); + + public VersionEncoder brokerVersion() + { + brokerVersion.wrap(buffer, offset + 18); + return brokerVersion; + } + + public static int rejectionReasonId() + { + return 8; + } + + public static String rejectionReasonCharacterEncoding() + { + return java.nio.charset.StandardCharsets.UTF_8.name(); + } + + public static String rejectionReasonMetaAttribute(final MetaAttribute metaAttribute) + { + if (MetaAttribute.PRESENCE == metaAttribute) + { + return "required"; + } + + return ""; + } + + public static int rejectionReasonHeaderLength() + { + return 4; + } + + public RecordMetadataEncoderBefore83 putRejectionReason(final DirectBuffer src, final int srcOffset, final int length) + { + if (length > 2147483647) + { + throw new IllegalStateException("length > maxValue for type: " + length); + } + + final int headerLength = 4; + final int limit = parentMessage.limit(); + parentMessage.limit(limit + headerLength + length); + buffer.putInt(limit, length, java.nio.ByteOrder.LITTLE_ENDIAN); + buffer.putBytes(limit + headerLength, src, srcOffset, length); + + return this; + } + + public RecordMetadataEncoderBefore83 putRejectionReason(final byte[] src, final int srcOffset, final int length) + { + if (length > 2147483647) + { + throw new IllegalStateException("length > maxValue for type: " + length); + } + + final int headerLength = 4; + final int limit = parentMessage.limit(); + parentMessage.limit(limit + headerLength + length); + buffer.putInt(limit, length, java.nio.ByteOrder.LITTLE_ENDIAN); + buffer.putBytes(limit + headerLength, src, srcOffset, length); + + return this; + } + + public RecordMetadataEncoderBefore83 rejectionReason(final String value) + { + final byte[] bytes = (null == value || value.isEmpty()) ? org.agrona.collections.ArrayUtil.EMPTY_BYTE_ARRAY : value.getBytes(java.nio.charset.StandardCharsets.UTF_8); + + final int length = bytes.length; + if (length > 2147483647) + { + throw new IllegalStateException("length > maxValue for type: " + length); + } + + final int headerLength = 4; + final int limit = parentMessage.limit(); + parentMessage.limit(limit + headerLength + length); + buffer.putInt(limit, length, java.nio.ByteOrder.LITTLE_ENDIAN); + buffer.putBytes(limit + headerLength, bytes, 0, length); + + return this; + } + + public String toString() + { + if (null == buffer) + { + return ""; + } + + return appendTo(new StringBuilder()).toString(); + } + + public StringBuilder appendTo(final StringBuilder builder) + { + if (null == buffer) + { + return builder; + } + + final RecordMetadataDecoderBefore83 decoder = new RecordMetadataDecoderBefore83(); + decoder.wrap(buffer, initialOffset, BLOCK_LENGTH, SCHEMA_VERSION); + + return decoder.appendTo(builder); + } +} diff --git a/backend/src/test/kotlin/io/zell/zdb/v81/Version81Test.java b/backend/src/test/kotlin/io/zell/zdb/v81/Version81Test.java index 3585ec9a..2d099b7b 100644 --- a/backend/src/test/kotlin/io/zell/zdb/v81/Version81Test.java +++ b/backend/src/test/kotlin/io/zell/zdb/v81/Version81Test.java @@ -22,6 +22,8 @@ import io.camunda.zeebe.model.bpmn.Bpmn; import io.camunda.zeebe.model.bpmn.BpmnModelInstance; import io.camunda.zeebe.protocol.ZbColumnFamilies; +import io.camunda.zeebe.protocol.record.RejectionType; +import io.camunda.zeebe.protocol.record.ValueType; import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent; import io.camunda.zeebe.protocol.record.value.BpmnElementType; import io.camunda.zeebe.protocol.record.value.ErrorType; @@ -43,7 +45,6 @@ import io.zell.zdb.state.incident.IncidentState; import io.zell.zdb.state.instance.InstanceState; import io.zell.zdb.state.process.ProcessState; -import io.zell.zdb.v82.Version82Test; import org.agrona.concurrent.UnsafeBuffer; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.*; @@ -59,16 +60,13 @@ import java.io.File; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Base64; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; +import java.util.*; +import java.util.stream.StreamSupport; import static io.zell.zdb.TestUtils.CONTAINER_PATH; +import static io.zell.zdb.TestUtils.TIMESTAMP_REGEX; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.InstanceOfAssertFactories.LONG; @Testcontainers @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -131,9 +129,9 @@ public void shouldReadStatusFromLog() { final var status = logStatus.status(); // then - assertThat(status.getHighestIndex()).isEqualTo(222); + assertThat(status.getHighestIndex()).isEqualTo(224); assertThat(status.getHighestTerm()).isEqualTo(1); - assertThat(status.getHighestRecordPosition()).isEqualTo(258); + assertThat(status.getHighestRecordPosition()).isEqualTo(260); assertThat(status.getLowestIndex()).isEqualTo(1); assertThat(status.getLowestRecordPosition()).isEqualTo(1); @@ -186,9 +184,9 @@ public void shouldReadStatusFromLog() { final var status = logStatus.status(); // then - assertThat(status.getHighestIndex()).isEqualTo(13); + assertThat(status.getHighestIndex()).isEqualTo(15); assertThat(status.getHighestTerm()).isEqualTo(1); - assertThat(status.getHighestRecordPosition()).isEqualTo(60); + assertThat(status.getHighestRecordPosition()).isEqualTo(62); assertThat(status.getLowestIndex()).isEqualTo(1); assertThat(status.getLowestRecordPosition()).isEqualTo(1); @@ -242,6 +240,99 @@ public void shouldReadLogContentWithIterator() { verifyCompleteLog(records); } + @Test + public void shouldReadRejection() { + // given + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + + // when + logContentReader.filterForRejections(); + + // then + final var rejection = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> record.component8() != RejectionType.NULL_VAL) + .findFirst(); + + assertThat(rejection).isPresent(); + assertThat(rejection.get().component8()).isEqualTo(RejectionType.NOT_FOUND); + assertThat(rejection.get().component9()).isEqualTo("Expected to find process definition with process ID 'nonExisting', but none found"); + } + + @Test + public void shouldSerializeRejectionToJson() throws JsonProcessingException { + // given + final var expectedJson = OBJECT_MAPPER.readTree(""" + {"position":62,"sourceRecordPosition":61,"key":-1,"recordType":"COMMAND_REJECTION", + "valueType":"PROCESS_INSTANCE_CREATION","intent":"CREATE","rejectionType":"NOT_FOUND", + "rejectionReason":"Expected to find process definition with process ID 'nonExisting', but none found", + "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":3,"brokerVersion":"8.1.18", + "recordValue":{"bpmnProcessId":"nonExisting","processDefinitionKey":0,"processInstanceKey":-1, + "version":-1,"variables":"gA==","fetchVariables":[], + "startInstructions":[]}} + } +"""); + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + + // when + logContentReader.filterForRejections(); + + // then + final var rejection = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> !record.component8().equals(RejectionType.NULL_VAL.name())) + .findFirst(); + + assertThat(rejection).isPresent(); + assertThat(rejection.get().component8()).isEqualTo(RejectionType.NOT_FOUND); + assertThat(rejection.get().component9()).isEqualTo("Expected to find process definition with process ID 'nonExisting', but none found"); + + final var recordJson = rejection.get().toString() + .replaceFirst(TIMESTAMP_REGEX, ""); + final var actualJson = OBJECT_MAPPER.readTree(recordJson); + assertThat(actualJson).isNotNull(); // is valid json + assertThat(actualJson).isEqualTo(expectedJson); + } + + @Test + public void shouldSerializeRecordToJson() throws JsonProcessingException { + // given + final var expectedJson = OBJECT_MAPPER.readTree(""" + {"position":13,"sourceRecordPosition":6,"key":2251799813685252,"recordType":"EVENT", + "valueType":"PROCESS_INSTANCE","intent":"ELEMENT_ACTIVATED","requestId":-1, + "requestStreamId":-2147483648,"protocolVersion":3,"brokerVersion":"8.1.18", + "recordValue":{"bpmnProcessId":"process","version":1,"processDefinitionKey":2251799813685249, + "processInstanceKey":2251799813685252,"elementId":"process","flowScopeKey":-1, + "bpmnElementType":"PROCESS","parentProcessInstanceKey":-1,"parentElementInstanceKey":-1}}"""); + final var logPath = ZeebePaths.Companion.getLogPath(TEMP_DIR, "1"); + var logContentReader = new LogContentReader(logPath); + logContentReader.filterForProcessInstance(zeebeContentCreator.processInstanceEvent.getProcessInstanceKey()); + + // when + final var piActivated = StreamSupport.stream(Spliterators.spliteratorUnknownSize(logContentReader, Spliterator.ORDERED), false) + .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) + .map(persistedRecord -> (ApplicationRecord) persistedRecord) + .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) + .filter(record -> record.component6() == ValueType.PROCESS_INSTANCE) + .filter(record -> record.component7() == ProcessInstanceIntent.ELEMENT_ACTIVATED) + .filter(record -> record.getPiRelatedValue() != null) + .filter(record -> record.getPiRelatedValue().getBpmnElementType() == BpmnElementType.PROCESS) + .findFirst(); + + // then + assertThat(piActivated).isPresent(); + final var recordJson = piActivated.get().toString() + .replaceFirst(TIMESTAMP_REGEX, ""); + final var actualJson = OBJECT_MAPPER.readTree(recordJson); + assertThat(actualJson).isNotNull(); // is valid json + assertThat(actualJson).isEqualTo(expectedJson); + } @Test public void shouldSkipFirstPartOfLog() { // given @@ -254,13 +345,13 @@ public void shouldSkipFirstPartOfLog() { logContentReader.forEachRemaining(records::add); // then - assertThat(records).hasSize(9); + assertThat(records).hasSize(11); // we skip the first raft record assertThat(records.stream().filter(RaftRecord.class::isInstance).count()).isEqualTo(0); - assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(9); + assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(11); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); assertThat(minIndex).isEqualTo(5); @@ -270,7 +361,7 @@ public void shouldSkipFirstPartOfLog() { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) @@ -327,9 +418,9 @@ public void shouldSeekToEndOfLogIfNoExistingSeek() { assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(1); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); - assertThat(minIndex).isEqualTo(13); + assertThat(minIndex).isEqualTo(15); final var maxPosition = records.stream() .filter(ApplicationRecord.class::isInstance) @@ -337,14 +428,14 @@ public void shouldSeekToEndOfLogIfNoExistingSeek() { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) .map(ApplicationRecord::getLowestPosition) .min(Long::compareTo) .orElseThrow(); - assertThat(minPosition).isEqualTo(60); + assertThat(minPosition).isEqualTo(62); } @Test @@ -602,12 +693,12 @@ public void shouldFilterWithProcessInstanceKeyAndSetBeginAndEndOfLogPosition() { } private static void verifyCompleteLog(List records) { - assertThat(records).hasSize(13); + assertThat(records).hasSize(15); assertThat(records.stream().filter(RaftRecord.class::isInstance).count()).isEqualTo(1); - assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(12); + assertThat(records.stream().filter(ApplicationRecord.class::isInstance).count()).isEqualTo(14); final var maxIndex = records.stream().map(PersistedRecord::index).max(Long::compareTo).get(); - assertThat(maxIndex).isEqualTo(13); + assertThat(maxIndex).isEqualTo(15); final var minIndex = records.stream().map(PersistedRecord::index).min(Long::compareTo).get(); assertThat(minIndex).isEqualTo(1); @@ -617,7 +708,7 @@ private static void verifyCompleteLog(List records) { .map(ApplicationRecord::getHighestPosition) .max(Long::compareTo) .orElseThrow(); - assertThat(maxPosition).isEqualTo(60); + assertThat(maxPosition).isEqualTo(62); final var minPosition = records.stream() .filter(ApplicationRecord.class::isInstance) .map(ApplicationRecord.class::cast) From d187953ad12e5d5810d668c2052ff1b354ef5bcb Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Sat, 4 Nov 2023 21:19:56 +0100 Subject: [PATCH 07/11] fix: read metadata for different versions Use old record metadata for previous versions use current version for 8.3+ --- .../io/zell/zdb/log/LogContentReader.kt | 88 ++++++++++++++----- .../log/records/IndexedRaftLogEntryImpl.java | 7 -- .../kotlin/io/zell/zdb/log/records/Record.kt | 4 +- 3 files changed, 66 insertions(+), 33 deletions(-) diff --git a/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt b/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt index a7897f25..d4ad9f54 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt +++ b/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt @@ -20,9 +20,10 @@ import io.camunda.zeebe.logstreams.impl.log.LoggedEventImpl import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter import io.camunda.zeebe.protocol.impl.record.RecordMetadata import io.camunda.zeebe.protocol.record.RecordType -import io.camunda.zeebe.protocol.record.RejectionType import io.zell.zdb.log.records.* +import io.zell.zdb.log.records.old.RecordMetadataBefore83 import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonElement import org.agrona.concurrent.UnsafeBuffer import java.nio.file.Path import kotlin.streams.asStream @@ -98,33 +99,12 @@ class LogContentReader(logPath: Path) : Iterator { do { val loggedEvent = LoggedEventImpl(); val metadata = RecordMetadata(); + metadata.reset() loggedEvent.wrap(readBuffer, offset) loggedEvent.readMetadata(metadata) - val valueJson = MsgPackConverter.convertToJson( - UnsafeBuffer(loggedEvent.valueBuffer, loggedEvent.valueOffset, loggedEvent.valueLength)) - - val parsedRecord = Record( - loggedEvent.position, - loggedEvent.sourceEventPosition, - loggedEvent.timestamp, - loggedEvent.key, - metadata.recordType, - metadata.valueType, - metadata.intent, - metadata.rejectionType, - metadata.rejectionReason, - metadata.requestId, - metadata.requestStreamId, - metadata.protocolVersion, - metadata.brokerVersion.toString(), - metadata.recordVersion, - metadata.authorization.authData.toString(), - Json.decodeFromString(valueJson), - json.decodeFromString(valueJson) ) - - + val parsedRecord: Record = readRecord(loggedEvent, metadata) applicationRecord.entries.add(parsedRecord) offset += loggedEvent.getLength(); @@ -135,6 +115,66 @@ class LogContentReader(logPath: Path) : Iterator { } } + private fun readRecord( + loggedEvent: LoggedEventImpl, + metadata: RecordMetadata + ): Record { + val valueJson = MsgPackConverter.convertToJson( + UnsafeBuffer(loggedEvent.valueBuffer, loggedEvent.valueOffset, loggedEvent.valueLength) + ) + val recordValue: JsonElement = + Json.decodeFromString(valueJson) + val pInstanceRelatedValue = + json.decodeFromString(valueJson) + + val parsedRecord: Record; + if (metadata.protocolVersion >= 4) { + parsedRecord = Record( + loggedEvent.position, + loggedEvent.sourceEventPosition, + loggedEvent.timestamp, + loggedEvent.key, + metadata.recordType, + metadata.valueType, + metadata.intent, + metadata.rejectionType, + metadata.rejectionReason, + metadata.requestId, + metadata.requestStreamId, + metadata.protocolVersion, + metadata.brokerVersion.toString(), + metadata.recordVersion, + metadata.authorization.authData.toString(), + recordValue, + pInstanceRelatedValue + ) + } else { + val recordMetadataBefore83 = RecordMetadataBefore83() + loggedEvent.readMetadata(recordMetadataBefore83) + + parsedRecord = Record( + loggedEvent.position, + loggedEvent.sourceEventPosition, + loggedEvent.timestamp, + loggedEvent.key, + recordMetadataBefore83.recordType, + recordMetadataBefore83.valueType, + recordMetadataBefore83.intent, + recordMetadataBefore83.rejectionType, + recordMetadataBefore83.rejectionReason, + recordMetadataBefore83.requestId, + recordMetadataBefore83.requestStreamId, + recordMetadataBefore83.protocolVersion, + recordMetadataBefore83.brokerVersion.toString(), + 0, + "", + recordValue, + pInstanceRelatedValue + ) + } + return parsedRecord + } + fun readAll(): LogContent { val logContent = LogContent() this.forEach { diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/IndexedRaftLogEntryImpl.java b/backend/src/main/kotlin/io/zell/zdb/log/records/IndexedRaftLogEntryImpl.java index d62e60cc..bb6606f6 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/records/IndexedRaftLogEntryImpl.java +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/IndexedRaftLogEntryImpl.java @@ -18,7 +18,6 @@ package io.zell.zdb.log.records; import io.atomix.raft.protocol.PersistedRaftRecord; -import io.atomix.raft.protocol.ReplicatableJournalRecord; import io.atomix.raft.storage.log.entry.ApplicationEntry; import io.atomix.raft.storage.log.entry.RaftEntry; import io.zell.zdb.journal.ReadOnlyJournalRecord; @@ -51,12 +50,6 @@ public PersistedRaftRecord getPersistedRaftRecord() { return new PersistedRaftRecord(this.term, this.index, this.record.asqn(), this.record.checksum(), serializedRaftLogEntry); } - public ReplicatableJournalRecord getReplicatableJournalRecord() { - byte[] serializedRecord = new byte[this.record.data().capacity()]; - this.record.data().getBytes(0, serializedRecord); - return new ReplicatableJournalRecord(this.term, this.index, this.record.checksum(), serializedRecord); - } - public long index() { return this.index; } diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/Record.kt b/backend/src/main/kotlin/io/zell/zdb/log/records/Record.kt index 209c4ee4..a202b38e 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/records/Record.kt +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/Record.kt @@ -40,8 +40,8 @@ data class Record(val position: Long, val requestStreamId: Int = 0, val protocolVersion: Int, val brokerVersion: String, - val recordVersion: Int, - val authData: String, + val recordVersion: Int ? = 0, + val authData: String ? = "", val recordValue: JsonElement, /*Transient marks to ignore the property during serialization */ @Transient val piRelatedValue: ProcessInstanceRelatedValue? = null From 77bb9dddef521fbf163dff140298d819f9a0bad0 Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Sat, 4 Nov 2023 21:25:14 +0100 Subject: [PATCH 08/11] test: adjust tests to fixed version read Old metadata is now used for reading which allows to read BrokerVersion and rejection type to read correctly --- .../io/zell/zdb/latest/VersionLatestTest.java | 3 +-- .../kotlin/io/zell/zdb/v82/Version82Test.java | 16 +++++++--------- .../kotlin/io/zell/zdb/v83/Version83Test.java | 3 +-- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/backend/src/test/kotlin/io/zell/zdb/latest/VersionLatestTest.java b/backend/src/test/kotlin/io/zell/zdb/latest/VersionLatestTest.java index c4334649..e3584091 100644 --- a/backend/src/test/kotlin/io/zell/zdb/latest/VersionLatestTest.java +++ b/backend/src/test/kotlin/io/zell/zdb/latest/VersionLatestTest.java @@ -273,7 +273,7 @@ public void shouldSerializeRejectionToJson() throws JsonProcessingException { "valueType":"PROCESS_INSTANCE_CREATION","intent":"CREATE","rejectionType":"NOT_FOUND", "rejectionReason":"Expected to find process definition with process ID 'nonExisting', but none found", "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":4,"brokerVersion":"8.4.0", - "recordVersion":1,"authData":"", + "recordVersion":1, "recordValue":{"bpmnProcessId":"nonExisting","processDefinitionKey":0,"processInstanceKey":-1, "version":-1,"variables":"gA==","fetchVariables":[], "startInstructions":[],"tenantId":""}} @@ -311,7 +311,6 @@ public void shouldSerializeRecordToJson() throws JsonProcessingException { {"position":12,"sourceRecordPosition":5,"key":2251799813685252,"recordType":"EVENT", "valueType":"PROCESS_INSTANCE","intent":"ELEMENT_ACTIVATED","requestId":-1, "requestStreamId":-2147483648,"protocolVersion":4,"brokerVersion":"8.4.0","recordVersion":1, - "authData":"", "recordValue":{"bpmnElementType":"PROCESS","elementId":"process","bpmnProcessId":"process", "version":1,"processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685252, "flowScopeKey":-1,"bpmnEventType":"UNSPECIFIED","parentProcessInstanceKey":-1, diff --git a/backend/src/test/kotlin/io/zell/zdb/v82/Version82Test.java b/backend/src/test/kotlin/io/zell/zdb/v82/Version82Test.java index 814a0a35..653bd17e 100644 --- a/backend/src/test/kotlin/io/zell/zdb/v82/Version82Test.java +++ b/backend/src/test/kotlin/io/zell/zdb/v82/Version82Test.java @@ -258,7 +258,7 @@ public void shouldReadRejection() { .findFirst(); assertThat(rejection).isPresent(); - assertThat(rejection.get().component8()).isEqualTo(RejectionType.INVALID_ARGUMENT); + assertThat(rejection.get().component8()).isEqualTo(RejectionType.NOT_FOUND); assertThat(rejection.get().component9()).isEqualTo("Expected to find process definition with process ID 'nonExisting', but none found"); } @@ -267,10 +267,9 @@ public void shouldSerializeRejectionToJson() throws JsonProcessingException { // given final var expectedJson = OBJECT_MAPPER.readTree(""" {"position":62,"sourceRecordPosition":61,"key":-1,"recordType":"COMMAND_REJECTION", - "valueType":"PROCESS_INSTANCE_CREATION","intent":"CREATE","rejectionType":"INVALID_ARGUMENT", + "valueType":"PROCESS_INSTANCE_CREATION","intent":"CREATE","rejectionType":"NOT_FOUND", "rejectionReason":"Expected to find process definition with process ID 'nonExisting', but none found", - "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":3,"brokerVersion":"2049.512.4096", - "recordVersion":20736,"authData":"", + "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":3,"brokerVersion":"8.2.16", "recordValue":{"bpmnProcessId":"nonExisting","processDefinitionKey":0,"processInstanceKey":-1, "version":-1,"variables":"gA==","fetchVariables":[], "startInstructions":[]}} @@ -287,11 +286,11 @@ public void shouldSerializeRejectionToJson() throws JsonProcessingException { .filter(persistedRecord -> persistedRecord instanceof ApplicationRecord) .map(persistedRecord -> (ApplicationRecord) persistedRecord) .flatMap(applicationRecord -> applicationRecord.getEntries().stream()) - .filter(record -> !record.component8().equals(RejectionType.NULL_VAL.name())) + .filter(record -> record.component8() != RejectionType.NULL_VAL) .findFirst(); assertThat(rejection).isPresent(); - assertThat(rejection.get().component8()).isEqualTo(RejectionType.INVALID_ARGUMENT); + assertThat(rejection.get().component8()).isEqualTo(RejectionType.NOT_FOUND); assertThat(rejection.get().component9()).isEqualTo("Expected to find process definition with process ID 'nonExisting', but none found"); final var recordJson = rejection.get().toString() @@ -306,9 +305,8 @@ public void shouldSerializeRecordToJson() throws JsonProcessingException { // given final var expectedJson = OBJECT_MAPPER.readTree(""" {"position":13,"sourceRecordPosition":6,"key":2251799813685252,"recordType":"EVENT", - "valueType":"PROCESS_INSTANCE","intent":"ELEMENT_ACTIVATED","rejectionType":"INVALID_ARGUMENT", - "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":3,"brokerVersion":"2303.512.4096", - "recordVersion":1,"authData":"", + "valueType":"PROCESS_INSTANCE","intent":"ELEMENT_ACTIVATED", + "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":3,"brokerVersion":"8.2.16", "recordValue":{"bpmnElementType":"PROCESS","elementId":"process","bpmnProcessId":"process", "version":1,"processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685252, "flowScopeKey":-1,"bpmnEventType":"UNSPECIFIED","parentProcessInstanceKey":-1, diff --git a/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java b/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java index 3d152cbd..5fbd6bc5 100644 --- a/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java +++ b/backend/src/test/kotlin/io/zell/zdb/v83/Version83Test.java @@ -273,7 +273,7 @@ public void shouldSerializeRejectionToJson() throws JsonProcessingException { "valueType":"PROCESS_INSTANCE_CREATION","intent":"CREATE","rejectionType":"NOT_FOUND", "rejectionReason":"Expected to find process definition with process ID 'nonExisting', but none found", "requestId":-1,"requestStreamId":-2147483648,"protocolVersion":4,"brokerVersion":"8.3.0", - "recordVersion":1,"authData":"", + "recordVersion":1, "recordValue":{"bpmnProcessId":"nonExisting","processDefinitionKey":0,"processInstanceKey":-1, "version":-1,"variables":"gA==","fetchVariables":[], "startInstructions":[],"tenantId":""}} @@ -311,7 +311,6 @@ public void shouldSerializeRecordToJson() throws JsonProcessingException { {"position":12,"sourceRecordPosition":5,"key":2251799813685252,"recordType":"EVENT", "valueType":"PROCESS_INSTANCE","intent":"ELEMENT_ACTIVATED","requestId":-1, "requestStreamId":-2147483648,"protocolVersion":4,"brokerVersion":"8.3.0","recordVersion":1, - "authData":"", "recordValue":{"bpmnElementType":"PROCESS","elementId":"process","bpmnProcessId":"process", "version":1,"processDefinitionKey":2251799813685249,"processInstanceKey":2251799813685252, "flowScopeKey":-1,"bpmnEventType":"UNSPECIFIED","parentProcessInstanceKey":-1, From 9162c702e84496aa02e09810a6c3e73b7d790935 Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Sat, 4 Nov 2023 21:26:36 +0100 Subject: [PATCH 09/11] build: add license --- .../old/RecordMetadataDecoderBefore83.java | 15 +++++++++++++++ .../old/RecordMetadataEncoderBefore83.java | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataDecoderBefore83.java b/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataDecoderBefore83.java index 282e76d5..a1327b94 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataDecoderBefore83.java +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataDecoderBefore83.java @@ -1,3 +1,18 @@ +/* + * Copyright © 2021 Christopher Kujawa (zelldon91@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ /* Generated SBE (Simple Binary Encoding) message codec. */ package io.zell.zdb.log.records.old; diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataEncoderBefore83.java b/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataEncoderBefore83.java index 58baebd1..1828bed2 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataEncoderBefore83.java +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/old/RecordMetadataEncoderBefore83.java @@ -1,3 +1,18 @@ +/* + * Copyright © 2021 Christopher Kujawa (zelldon91@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ /* Generated SBE (Simple Binary Encoding) message codec. */ package io.zell.zdb.log.records.old; From 466c92da9bf0236e0945c1478743d3122a1369e5 Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Sat, 4 Nov 2023 21:34:24 +0100 Subject: [PATCH 10/11] style: add line at end of file --- .../src/main/kotlin/io/zell/zdb/log/records/IntentSerializer.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/src/main/kotlin/io/zell/zdb/log/records/IntentSerializer.kt b/backend/src/main/kotlin/io/zell/zdb/log/records/IntentSerializer.kt index 73b036e4..17e398af 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/records/IntentSerializer.kt +++ b/backend/src/main/kotlin/io/zell/zdb/log/records/IntentSerializer.kt @@ -37,4 +37,4 @@ class IntentSerializer : KSerializer { override fun serialize(encoder: Encoder, value: Intent) { encoder.encodeString(value.name()) } -} \ No newline at end of file +} From d3d382d4299f19175b880ac0d73aadf832966ffb Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Sat, 4 Nov 2023 21:34:35 +0100 Subject: [PATCH 11/11] style: introduce const --- backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt b/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt index d4ad9f54..f30afc22 100644 --- a/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt +++ b/backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt @@ -28,6 +28,8 @@ import org.agrona.concurrent.UnsafeBuffer import java.nio.file.Path import kotlin.streams.asStream +private const val PROTOCOL_VERSION_83 = 4 + class LogContentReader(logPath: Path) : Iterator { private val json = Json { ignoreUnknownKeys = true } @@ -128,7 +130,7 @@ class LogContentReader(logPath: Path) : Iterator { json.decodeFromString(valueJson) val parsedRecord: Record; - if (metadata.protocolVersion >= 4) { + if (metadata.protocolVersion >= PROTOCOL_VERSION_83) { parsedRecord = Record( loggedEvent.position, loggedEvent.sourceEventPosition,