Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protobuf schema doesn't parse protobuf wire protocol correctly. #152

Open
ideasculptor opened this issue Sep 23, 2021 · 18 comments · May be fixed by #258
Open

protobuf schema doesn't parse protobuf wire protocol correctly. #152

ideasculptor opened this issue Sep 23, 2021 · 18 comments · May be fixed by #258

Comments

@ideasculptor
Copy link

There are a whole slew of problems. First and foremost, it isn't honoring the wire protocol definition - because the docs are structured ridiculously. They document the wire format for avro in a table, making it appear that that is the total of the wire format. But then after the table, there's a paragraph that goes on to describe how protobufs add an extra array of descriptor information to the header before the protobuf data. Look after the green box on this page: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

The protobuf schema implementation completely ignores the array of message descriptor indexes, instead just searching the buffer for the first non-zero byte. But even the first message in a protobuf file can be encoded with the array 1,0 instead of just a 0, implying an array of length 1, and a message index of 0, instead of an array of length 0 and an assumed message index of 0. Note also that the integers in the array, including the length, are variable length zig zag encoded, not just network byte order, so you have to parse them correctly, too, and cannot just assume 4 bytes per integer.

Then there's the problem that things other than messages (Types) can be declared within a .proto file so even if the indexes were being honored in the recursive loop implemented by getNestedTypeName(), the exit condition only checks for Type and Namespace, but it is entirely possible to encounter an Enum or a Service or a Method, so it is necessary to iterate over the keys in parent until you find either a Namespace or a Type, and then continue traversing the descriptor hierarchy from there, rather than always assuming the first key is the one to use.

This is relatively simple to implement, by just iterating over the keys until you find one which is instanceof Type or Namespace.

Finally, the code assumes that there is a parsedMessage.package field, which may be there when parsing a .proto string, but is definitely NOT there when parsing a JSON protobuf descriptor. When the root is created via a JSON descriptor, you have to parse the package name by iterating through the Namespace declarations in the parent hierarchy. It'd be great if the package makes it into the JSON descriptor, but until it does, it is probably safer to determine the package name dynamically rather than looking for it from the parser, since it might go away from parsed protos as easily as it could be added to json descriptors.

if (reflection instanceof Namespace && !(reflection instanceof Type) && reflection.nested)
    return reflection.name + '.' + this.getNestedTypeName(reflection.nested)
return keys[0]

will return the fully qualified name without relying on the package string

I'm going to take a stab at fixing the code in ProtoSchema.ts and submitting a PR to fix all of that (and the corresponding changes in the serializer which will generate the correct array by walking the descriptor hierarchy).

@ideasculptor
Copy link
Author

it's also worth pointing out that the current implementation will not work at all for any protobuf which references another protobuf via an import statement, since it parses only the schema stored by the registry and not any schema referenced by that schema.

@xnog
Copy link

xnog commented Oct 1, 2021

+1

it's missing the "message-indexes" part.

image

image

@ideasculptor
Copy link
Author

ideasculptor commented Oct 1, 2021

I have code which encodes it correctly. It isn't integrated with the schema registry client. I built a separate serializer so that I am using schema registry client only to talk to the registry. Code looks like this:

    private encodePayloadHeader(schemaInfo: SchemaInfo, msgIndexes: number[]): ProtoBuffer {
        let writer = new BufferWriter()
                .uint32(0) // don't zig zag encode. uint32 writes variable number of bytes, so this is 1 byte
                .fixed32(NetworkOrder(schemaInfo.schemaId)) // make big-endian, then write 4 bytes int32
                .sint32(msgIndexes.length) // zig-zag encoded array length

        for (const msgIndex of msgIndexes) {
            writer.sint32(msgIndex) // zig-zag encoded index value
        }
        return writer.finish()
}

I generate the full buffer more or less like this:

        const buf = this.encodePayloadHeader(schemaInfo, msgIndexes)
        const writer = schemaInfo.type.encode(thing)
        return Buffer.concat([buf, writer.finish()])

@ideasculptor
Copy link
Author

ideasculptor commented Oct 1, 2021

On the decode side, it looks like this:

    private decodeHeader(topic: string, buffer: Buffer): ProtoInfo {
        let bufferReader = Reader.create(buffer)
        const magicByte = bufferReader.uint32()
        const schemaId = HostOrder(bufferReader.fixed32())
        const arrayLen = bufferReader.sint32()
        const msgIndexes = new Array<number>(arrayLen)
        for (let i = 0; i < arrayLen; i++) {
            msgIndexes[i] = bufferReader.sint32()
        }
        return {
            magicByte: magicByte,
            schemaId: schemaId,
            msgIndexes: msgIndexes,
            bytesRead: bufferReader.pos,
        }
    }

    public async deserialize(topic: string, buffer: Buffer): Promise<Message<{}>> {
        if (buffer.length < 6) {
            throw new Error(`buffer with length ${buffer.length} is not long enough to contain a protobuf`)
        }
        const protoInfo = this.decodeHeader(topic, buffer)

        const type = await this.protobufResolver.ResolveProtobuf(topic, protoInfo.schemaId, protoInfo.msgIndexes)
        let bufferReader = Reader.create(buffer)
        bufferReader.skip(protoInfo.bytesRead)
        return type.decode(bufferReader)
    }

protobufResolver uses registry client, topic name, and info parsed from wire protocol to resolve a protobuf Type instance, which is then used to decode the protobuf. That allows me to inject whatever logic I want into the deserializer via protobufResolver for figuring out the type that is encoded in the payload, since correctly computing the message type from message indexes isn't really possible with protobufjs as it is currently implemented. At least not if you also have imported references to other protobufs in your .proto files, since the schema parsed out of the registry won't include the references. By delegating to a resolver, I can resort to quick hacks like hardcoding the type name based on indexes and topic name, for example.

@ideasculptor
Copy link
Author

ideasculptor commented Oct 1, 2021

I'll probably open-source the serializer and deserializer I built, but it's not likely to happen for a week or two.

@e10101
Copy link

e10101 commented Nov 21, 2021

Hi @ideasculptor , got the same issue. Looking forward for your fixes. Many thanks.

@tzahibena-simplex
Copy link

Same problem, any ETA for a fix?

@MattMakes
Copy link

Same issue, do we have an ETA for this fix?

@msempere
Copy link

I'll probably open-source the serializer and deserializer I built, but it's not likely to happen for a week or two.

@ideasculptor have you had the chance of working on a PR to address this issue? I have recently started using kafkajs/confluent-schema-registry and I can confirm that the library works for JSON and AVRO types, but fails for PROTOBUF.

@nicoga-payu-gpo
Copy link

Any update on this?

@NikitaKemarskiyLeia
Copy link

@ideasculptor Do you plan to proceed with it?

@ideasculptor
Copy link
Author

ideasculptor commented Mar 28, 2023

Oops, I wrote a comment thinking this was the golang version of the client. @NikitaKemarskiyLeia - it seems unlikely that I'm going to submit an actual PR given how long it has been since I touched this. But the code I pasted above has been working for us very well for quite some time and is cross-compatible with other kafka clients in other languages that do have full protobuf schema registry support, so you've got everything you need to implement your own serialization and deserialization.

Admittedly, copying and pasting those few dozen lines of code is sub-optimal, but having to dig back into this codebase and figure out who to integrate it properly for a PR would be a fair bit of work for me and it is pretty far down my priority queue at the moment, so I wouldn't hold your breath.

@NikitaKemarskiyLeia
Copy link

NikitaKemarskiyLeia commented Mar 29, 2023

@ideasculptor Thanks for the response, I'll probably work on this PR by myself then. Your comment was also useful – I have applied your fix to my code (unless the bug is fixed in the library).

@Meet-Modi
Copy link

Hi @NikitaKemarskiyLeia , did you got a chance to work on the PR?

@NikitaKemarskiyLeia
Copy link

@Meet-Modi Unfortunately have no time for such contribution now :(
Using the solution provided by @ideasculptor

@itsdeekay2
Copy link

@tulios @Nevon Looks like a lot of people are facing this issue when communicating the data across different services and it is pending for more than 1.5 years. It would be great if same can be picked in next release considering the impact. Thanks in advance.

@itsdeekay2
Copy link

@ideasculptor could you please help with definition of this.protobufResolver.ResolveProtobuf to get the type?

@mkoiev
Copy link

mkoiev commented Apr 23, 2024

@ideasculptor hello, thank you a lot for sharing workaround, or could you please advice how to fetch msg ids, share your codebase

@NikitaKemarskiyLeia might be you can help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.