Skip to content

Commit

Permalink
Merge pull request #322 from Zelldon/ck-fix-versions-and-rejectiontype
Browse files Browse the repository at this point in the history
Fix reading of old records and rejectiontype
  • Loading branch information
ChrisKujawa authored Nov 4, 2023
2 parents 6e18607 + d3d382d commit ea945d4
Show file tree
Hide file tree
Showing 16 changed files with 2,246 additions and 168 deletions.
20 changes: 11 additions & 9 deletions backend/src/main/kotlin/io/zell/zdb/log/LogContent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,19 @@ class LogContent {
.append("\\n").append(entry.intent.name())

if (entry.valueType == ValueType.PROCESS_INSTANCE) {
val piRelatedValue = entry.recordValue.piRelatedValue
piRelatedValue.bpmnElementType?.let {
content.append("\\n").append(it)
}
val piRelatedValue = entry.piRelatedValue
piRelatedValue?.let {
piRelatedValue.bpmnElementType?.let {
content.append("\\n").append(it)
}

piRelatedValue.processInstanceKey?.let {
content.append("\\nPI Key: ").append(it)
}
piRelatedValue.processInstanceKey?.let {
content.append("\\nPI Key: ").append(it)
}

piRelatedValue.processDefinitionKey?.let {
content.append("\\nPD Key: ").append(it)
piRelatedValue.processDefinitionKey?.let {
content.append("\\nPD Key: ").append(it)
}
}
}

Expand Down
98 changes: 80 additions & 18 deletions backend/src/main/kotlin/io/zell/zdb/log/LogContentReader.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,17 @@ import io.atomix.raft.storage.log.entry.SerializedApplicationEntry
import io.camunda.zeebe.logstreams.impl.log.LoggedEventImpl
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter
import io.camunda.zeebe.protocol.impl.record.RecordMetadata
import io.camunda.zeebe.protocol.record.RecordType
import io.zell.zdb.log.records.*
import io.zell.zdb.log.records.old.RecordMetadataBefore83
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
import org.agrona.concurrent.UnsafeBuffer
import java.nio.file.Path
import kotlin.streams.asStream

private const val PROTOCOL_VERSION_83 = 4

class LogContentReader(logPath: Path) : Iterator<PersistedRecord> {

private val json = Json { ignoreUnknownKeys = true }
Expand Down Expand Up @@ -96,27 +101,12 @@ class LogContentReader(logPath: Path) : Iterator<PersistedRecord> {
do {
val loggedEvent = LoggedEventImpl();
val metadata = RecordMetadata();
metadata.reset()

loggedEvent.wrap(readBuffer, offset)
loggedEvent.readMetadata(metadata)

val valueJson = MsgPackConverter.convertToJson(
UnsafeBuffer(loggedEvent.valueBuffer, loggedEvent.valueOffset, loggedEvent.valueLength))

val parsedRecord = Record(
loggedEvent.position,
loggedEvent.sourceEventPosition,
loggedEvent.timestamp,
loggedEvent.key,
metadata.recordType,
metadata.valueType,
metadata.intent,
metadata.brokerVersion.toString(),
metadata.recordVersion,
RecordValue(valueJson,
json.decodeFromString<ProcessInstanceRelatedValue>(valueJson) )
)

val parsedRecord: Record = readRecord(loggedEvent, metadata)
applicationRecord.entries.add(parsedRecord)

offset += loggedEvent.getLength();
Expand All @@ -127,6 +117,66 @@ class LogContentReader(logPath: Path) : Iterator<PersistedRecord> {
}
}

private fun readRecord(
loggedEvent: LoggedEventImpl,
metadata: RecordMetadata
): Record {
val valueJson = MsgPackConverter.convertToJson(
UnsafeBuffer(loggedEvent.valueBuffer, loggedEvent.valueOffset, loggedEvent.valueLength)
)
val recordValue: JsonElement =
Json.decodeFromString(valueJson)
val pInstanceRelatedValue =
json.decodeFromString<ProcessInstanceRelatedValue>(valueJson)

val parsedRecord: Record;
if (metadata.protocolVersion >= PROTOCOL_VERSION_83) {
parsedRecord = Record(
loggedEvent.position,
loggedEvent.sourceEventPosition,
loggedEvent.timestamp,
loggedEvent.key,
metadata.recordType,
metadata.valueType,
metadata.intent,
metadata.rejectionType,
metadata.rejectionReason,
metadata.requestId,
metadata.requestStreamId,
metadata.protocolVersion,
metadata.brokerVersion.toString(),
metadata.recordVersion,
metadata.authorization.authData.toString(),
recordValue,
pInstanceRelatedValue
)
} else {
val recordMetadataBefore83 = RecordMetadataBefore83()
loggedEvent.readMetadata(recordMetadataBefore83)

parsedRecord = Record(
loggedEvent.position,
loggedEvent.sourceEventPosition,
loggedEvent.timestamp,
loggedEvent.key,
recordMetadataBefore83.recordType,
recordMetadataBefore83.valueType,
recordMetadataBefore83.intent,
recordMetadataBefore83.rejectionType,
recordMetadataBefore83.rejectionReason,
recordMetadataBefore83.requestId,
recordMetadataBefore83.requestStreamId,
recordMetadataBefore83.protocolVersion,
recordMetadataBefore83.brokerVersion.toString(),
0,
"",
recordValue,
pInstanceRelatedValue
)
}
return parsedRecord
}

fun readAll(): LogContent {
val logContent = LogContent()
this.forEach {
Expand All @@ -153,11 +203,23 @@ class LogContentReader(logPath: Path) : Iterator<PersistedRecord> {
applicationRecordFilter = {
record : ApplicationRecord ->
record.entries.asSequence()
.map { it.recordValue.piRelatedValue }
.map { it.piRelatedValue }
.filter { it != null }
.map { it!! }
.filter { it.processInstanceKey != null }
.asStream()
.map { it.processInstanceKey }
.anyMatch(instanceKey::equals)
}
}

fun filterForRejections() {
applicationRecordFilter = {
record : ApplicationRecord ->
record.entries.asSequence()
.map { it.recordType }
.any { it == RecordType.COMMAND_REJECTION }
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,20 @@ class ApplicationRecord(val index: Long, val term: Long, val highestPosition: Lo
.append(separator)
.append(record.intent)

val piRelatedValue = record.recordValue.piRelatedValue
piRelatedValue.processInstanceKey?.let {
stringBuilder
.append(separator)
.append(it)
.append(separator)
}
val piRelatedValue = record.piRelatedValue
piRelatedValue?.let {
piRelatedValue.processInstanceKey?.let {
stringBuilder
.append(separator)
.append(it)
.append(separator)
}

piRelatedValue.bpmnElementType?.let {
stringBuilder
.append(it)
.append(separator)
piRelatedValue.bpmnElementType?.let {
stringBuilder
.append(it)
.append(separator)
}
}
return stringBuilder.toString()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package io.zell.zdb.log.records;

import io.atomix.raft.protocol.PersistedRaftRecord;
import io.atomix.raft.protocol.ReplicatableJournalRecord;
import io.atomix.raft.storage.log.entry.ApplicationEntry;
import io.atomix.raft.storage.log.entry.RaftEntry;
import io.zell.zdb.journal.ReadOnlyJournalRecord;
Expand Down Expand Up @@ -51,12 +50,6 @@ public PersistedRaftRecord getPersistedRaftRecord() {
return new PersistedRaftRecord(this.term, this.index, this.record.asqn(), this.record.checksum(), serializedRaftLogEntry);
}

public ReplicatableJournalRecord getReplicatableJournalRecord() {
byte[] serializedRecord = new byte[this.record.data().capacity()];
this.record.data().getBytes(0, serializedRecord);
return new ReplicatableJournalRecord(this.term, this.index, this.record.checksum(), serializedRecord);
}

public long index() {
return this.index;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright © 2021 Christopher Kujawa ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.zell.zdb.log.records

import io.camunda.zeebe.protocol.record.intent.Intent
import kotlinx.serialization.KSerializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.descriptors.buildClassSerialDescriptor
import kotlinx.serialization.descriptors.element
import kotlinx.serialization.encoding.Decoder
import kotlinx.serialization.encoding.Encoder

class IntentSerializer : KSerializer<Intent> {
override fun deserialize(decoder: Decoder): Intent {
// there is currently not easy way to deserialize, and we also don't need
TODO("Not yet implemented")
}

override val descriptor: SerialDescriptor
= buildClassSerialDescriptor("Intent") {
element<String>("intent")
}

override fun serialize(encoder: Encoder, value: Intent) {
encoder.encodeString(value.name())
}
}
23 changes: 19 additions & 4 deletions backend/src/main/kotlin/io/zell/zdb/log/records/Record.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,37 @@
package io.zell.zdb.log.records

import io.camunda.zeebe.protocol.record.RecordType
import io.camunda.zeebe.protocol.record.RejectionType
import io.camunda.zeebe.protocol.record.ValueType
import io.camunda.zeebe.protocol.record.intent.Intent
import kotlinx.serialization.Serializable
import kotlinx.serialization.Transient
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement

@Serializable
data class Record(val position: Long,
val sourceRecordPosition: Long,
val timestamp: Long,
val key: Long,
val recordType: RecordType,
val valueType: ValueType,
@Serializable(with = IntentSerializer::class)
val intent: Intent,
val rejectionType: RejectionType? = RejectionType.NULL_VAL,
val rejectionReason: String? = "",
val requestId: Long? = 0,
val requestStreamId: Int = 0,
val protocolVersion: Int,
val brokerVersion: String,
val recordVersion: Int,
val recordValue: RecordValue

val recordVersion: Int ? = 0,
val authData: String ? = "",
val recordValue: JsonElement,
/*Transient marks to ignore the property during serialization */
@Transient val piRelatedValue: ProcessInstanceRelatedValue? = null
) {
override fun toString(): String {
return """{"key": $key, "position": $position, "sourceRecordPosition": $sourceRecordPosition, "intent": "$intent", "recordType": "$recordType", "valueType": "$valueType", "timestamp": $timestamp, "recordVersion": $recordVersion, "brokerVersion": "$brokerVersion", "value": ${recordValue.valueJson}}"""
return Json.encodeToString(this)
}
}
18 changes: 0 additions & 18 deletions backend/src/main/kotlin/io/zell/zdb/log/records/RecordValue.kt

This file was deleted.

Loading

0 comments on commit ea945d4

Please sign in to comment.