diff --git a/cli/src/lib/pub.ts b/cli/src/lib/pub.ts index 45e768f28..85bc44e41 100644 --- a/cli/src/lib/pub.ts +++ b/cli/src/lib/pub.ts @@ -197,6 +197,46 @@ const multiSend = ( }) } +const handlePipedMultiline = (connOpts: IClientOptions, pubOpts: { topic: string; opts: IClientPublishOptions }) => { + const client = mqtt.connect(connOpts) + let messageQueue: string[] = [] + let publishedCount = 0 + + process.stdin.pipe(split2()).on('data', (chunk) => { + const message = chunk.toString() + if (message.length > 0) { + messageQueue.push(message) + } + }) + + client.on('connect', () => { + basicLog.connected() + if (messageQueue.length > 0) { + logWrapper.await(`Publishing ${messageQueue.length} messages...`) + messageQueue.forEach((message) => { + client.publish(pubOpts.topic, Buffer.from(message), pubOpts.opts, (err) => { + if (err) { + basicLog.error(err) + } else { + publishedCount++ + if (publishedCount === messageQueue.length) { + logWrapper.success(`Successfully published ${publishedCount} messages`) + } + } + }) + }) + setTimeout(() => { + client.end() + }, 1000) + } + }) + + client.on('error', (err) => { + basicLog.error(err) + client.end() + }) +} + const handleFileRead = (filePath: string) => { try { basicLog.fileReading() @@ -231,16 +271,21 @@ const pub = (options: PublishOptions) => { const pubOpts = parsePublishOptions(options) const handleStdin = () => { - if (options.multiline) { - multiSend(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes) - } else { - process.stdin.pipe( + // One line mode + if (!options.multiline) { + return process.stdin.pipe( concat((data) => { pubOpts.message = data send(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes) }), ) } + + // Multiline mode + const isPiped = !process.stdin.isTTY + return isPiped + ? handlePipedMultiline(connOpts, pubOpts) + : multiSend(loadOptions, connOpts, pubOpts, options.maximumReconnectTimes) } if (options.fileRead) {