Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cli): fix pub the wrong base64 formart message #1413

Merged
merged 1 commit into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions cli/src/lib/pub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,29 @@ import delay from '../utils/delay'
import { saveConfig, loadConfig } from '../utils/config'
import { loadSimulator } from '../utils/simulate'
import { serializeProtobufToBuffer } from '../utils/protobuf'
import convertPayload from '../utils/convertPayload'

const processPublishMessage = (
message: string | Buffer,
protobufPath: string | undefined,
protobufMessageName: string | undefined,
format: FormatType | undefined,
): Buffer | string => {
/*
* Pipeline for processing outgoing messages in two potential stages:
* 1. Format Conversion --> Applied if a format is specified, transforming the message into that format; if absent, the message retains its initial state.
* 2. Protobuf Serialization --> Engaged if both protobuf path and message name are present, encapsulating the message into a protobuf format; without these settings, the message circulates unchanged.
*/
const pipeline = [
(msg: string | Buffer) => (format ? convertPayload(Buffer.from(msg.toString()), format, 'encode') : msg),
(msg: string | Buffer) =>
protobufPath && protobufMessageName
? serializeProtobufToBuffer(msg.toString(), protobufPath, protobufMessageName)
: msg,
]

return pipeline.reduce((msg, transformer) => transformer(msg), message) as Buffer
}

const send = (
config: boolean | string | undefined,
Expand All @@ -29,8 +52,8 @@ const send = (
basicLog.connected()
const { topic, message, protobufPath, protobufMessageName, format } = pubOpts
basicLog.publishing()
let bufferMessage = serializeProtobufToBuffer(message, protobufPath, protobufMessageName, format)
client.publish(topic, bufferMessage, pubOpts.opts, (err) => {
const publishMessage = processPublishMessage(message, protobufPath, protobufMessageName, format)
client.publish(topic, publishMessage, pubOpts.opts, (err) => {
if (err) {
signale.warn(err)
} else {
Expand Down Expand Up @@ -74,9 +97,8 @@ const multisend = (
})
sender._write = (line, _enc, cb) => {
const { topic, opts, protobufPath, protobufMessageName, format } = pubOpts

let bufferMessage = serializeProtobufToBuffer(line.trim(), protobufPath, protobufMessageName, format)
client.publish(topic, bufferMessage, opts, cb)
const publishMessage = processPublishMessage(line.trim(), protobufPath, protobufMessageName, format)
client.publish(topic, publishMessage, opts, cb)
}

client.on('connect', () => {
Expand Down
39 changes: 32 additions & 7 deletions cli/src/lib/sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,35 @@ import convertPayload from '../utils/convertPayload'
import { saveConfig, loadConfig } from '../utils/config'
import { deserializeBufferToProtobuf } from '../utils/protobuf'

const processReceivedMessage = (
payload: Buffer,
protobufPath: string | undefined,
protobufMessageName: string | undefined,
format: FormatType | undefined,
): string => {
let message: string | Buffer = payload
/*
* Pipeline for processing incoming messages, following two potential steps:
* 1. Protobuf Deserialization --> Utilized if both protobuf path and message name are defined, otherwise message passes as is.
* 2. Format Conversion --> Engaged if a format is defined, converting the message accordingly; if not defined, message passes unchanged.
*/
const pipeline = [
(msg: Buffer) =>
protobufPath && protobufMessageName
? deserializeBufferToProtobuf(msg, protobufPath, protobufMessageName, format)
: msg,
(msg: Buffer) => (format ? convertPayload(msg, format, 'decode') : msg),
]

message = pipeline.reduce((msg, transformer) => transformer(msg), message)

if (Buffer.isBuffer(message)) {
message = message.toString('utf-8')
}

return message
}

const sub = (options: SubscribeOptions) => {
const { save, config } = options

Expand Down Expand Up @@ -66,12 +95,8 @@ const sub = (options: SubscribeOptions) => {

options.verbose && msgData.push({ label: 'topic', value: topic })

let payloadMessage = deserializeBufferToProtobuf(payload, protobufPath, protobufMessageName, format)
if (payloadMessage) {
msgData.push({ label: 'payload', value: format ? convertPayload(payloadMessage, format) : payloadMessage })
} else {
msgData.push({ label: 'payload', value: convertPayload(payload, format) })
}
let receivedMessage = processReceivedMessage(payload, protobufPath, protobufMessageName, format)
msgData.push({ label: 'payload', value: receivedMessage })

packet.retain && msgData.push({ label: 'retain', value: packet.retain })

Expand All @@ -91,7 +116,7 @@ const sub = (options: SubscribeOptions) => {

!outputModeClean
? msgLog(msgData)
: console.log(JSON.stringify({ topic, payload: convertPayload(payload, format), packet }, null, 2))
: console.log(JSON.stringify({ topic, payload: convertPayload(payload, format, 'decode'), packet }, null, 2))
})

client.on('error', (err) => {
Expand Down
36 changes: 24 additions & 12 deletions cli/src/utils/convertPayload.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
import chalk from 'chalk'

const convertJSON = (value: Buffer) => {
const convertJSON = (value: Buffer | string, action: 'encode' | 'decode') => {
try {
return JSON.stringify(JSON.parse(value.toString()), null, 2)
if (action === 'decode') {
return JSON.stringify(JSON.parse(value.toString()), null, 2)
} else {
return Buffer.from(JSON.stringify(JSON.parse(value.toString())))
}
} catch (err) {
return chalk.red(err)
}
}

const convertPayload = (payload: Buffer, to?: FormatType) => {
switch (to) {
case 'base64':
return payload.toString('base64')
case 'json':
return convertJSON(payload)
case 'hex':
return payload.toString('hex').replace(/(.{4})/g, '$1 ')
default:
return payload.toString('utf-8')
const convertPayload = (payload: Buffer | string, format?: FormatType, action: 'encode' | 'decode' = 'decode') => {
const actions = {
encode: {
base64: () => Buffer.from(payload.toString(), 'base64'),
json: () => convertJSON(payload, 'encode'),
hex: () => Buffer.from(payload.toString().replace(/\s+/g, ''), 'hex'),
default: () => Buffer.from(payload.toString(), 'utf-8'),
},
decode: {
base64: () => payload.toString('base64'),
json: () => convertJSON(payload, 'decode'),
hex: () => payload.toString('hex').replace(/(.{4})/g, '$1 '),
default: () => payload.toString('utf-8'),
},
}
const actionSet = actions[action]
const runAction = actionSet[format || 'default']

return runAction ? runAction() : payload
}

export default convertPayload
91 changes: 33 additions & 58 deletions cli/src/utils/protobuf.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,78 +2,53 @@ import protobuf from 'protobufjs'
import signale from './signale'
import { transformPBJSError } from './protobufErrors'

const convertObject = (raw: string | Buffer, format?: FormatType | undefined) => {
switch (format) {
case 'base64':
return Buffer.from(raw.toString('utf-8'), 'base64').toString('utf-8')
case 'hex':
return Buffer.from(raw.toString('utf-8').replaceAll(' ', ''), 'hex').toString('utf-8')
case 'json':
return JSON.stringify(JSON.parse(raw.toString('utf-8')), null, 2)
default:
return raw.toString('utf-8')
}
}

export const serializeProtobufToBuffer = (
raw: string | Buffer,
protobufPath: string | undefined,
protobufMessageName: string | undefined,
format?: FormatType | undefined,
protobufPath: string,
protobufMessageName: string,
): Buffer => {
let rawData
try {
rawData = convertObject(raw, format)
} catch (error: unknown) {
signale.error(`Message format type error : ${(error as Error).message.split('\n')[0]}`)
process.exit(1)
}

let rawData = raw.toString('utf-8')
let bufferMessage = Buffer.from(rawData)
if (protobufPath && protobufMessageName) {
try {
const root = protobuf.loadSync(protobufPath)
const Message = root.lookupType(protobufMessageName)
const err = Message.verify(JSON.parse(rawData))
if (err) {
signale.error(`Message serialization error: ${err}`)
process.exit(1)
}
const data = Message.create(JSON.parse(rawData))
const serializedMessage = Message.encode(data).finish()
bufferMessage = Buffer.from(serializedMessage)
} catch (error: unknown) {
signale.error(`Message serialization error: ${(error as Error).message.split('\n')[0]}`)
try {
const root = protobuf.loadSync(protobufPath)
const Message = root.lookupType(protobufMessageName)
const err = Message.verify(JSON.parse(rawData))
if (err) {
signale.error(`Message serialization error: ${err}`)
process.exit(1)
}
const data = Message.create(JSON.parse(rawData))
const serializedMessage = Message.encode(data).finish()
bufferMessage = Buffer.from(serializedMessage)
} catch (error: unknown) {
signale.error(`Message serialization error: ${(error as Error).message.split('\n')[0]}`)
process.exit(1)
}
return bufferMessage
}

export const deserializeBufferToProtobuf = (
payload: Buffer,
protobufPath: string | undefined,
protobufMessageName: string | undefined,
to?: FormatType,
protobufPath: string,
protobufMessageName: string,
needFormat: FormatType | undefined,
): any => {
if (protobufPath && protobufMessageName) {
try {
const root = protobuf.loadSync(protobufPath)
const Message = root.lookupType(protobufMessageName)
const MessageData = Message.decode(payload)
const err = Message.verify(MessageData)
if (err) {
signale.error(`Message deserialization error: ${err}`)
process.exit(1)
}
if (to) {
return Buffer.from(JSON.stringify(MessageData.toJSON()))
}
return MessageData
} catch (error: unknown) {
let err = transformPBJSError(error as Error)
signale.error(err.message.split('\n')[0])
try {
const root = protobuf.loadSync(protobufPath)
const Message = root.lookupType(protobufMessageName)
const MessageData = Message.decode(payload)
const err = Message.verify(MessageData)
if (err) {
signale.error(`Message deserialization error: ${err}`)
process.exit(1)
}
if (needFormat) {
return Buffer.from(JSON.stringify(MessageData.toJSON()))
}
return MessageData
} catch (error: unknown) {
let err = transformPBJSError(error as Error)
signale.error(err.message.split('\n')[0])
process.exit(1)
}
}
Loading