Skip to content

Commit

Permalink
feature: allow messages and producers to handle int64 sequences
Browse files Browse the repository at this point in the history
  • Loading branch information
alanhoff committed Sep 27, 2020
1 parent 52777a8 commit 3615270
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 1 deletion.
6 changes: 6 additions & 0 deletions src/Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "Message.h"
#include "MessageId.h"
#include <pulsar/c/message.h>
#include <string>

static const std::string CFG_DATA = "data";
static const std::string CFG_PROPS = "properties";
Expand Down Expand Up @@ -176,6 +177,11 @@ pulsar_message_t *Message::BuildMessage(Napi::Object conf) {
pulsar_message_set_sequence_id(cMessage, sequenceId.Int64Value());
}

if (conf.Has(CFG_SEQUENCE_ID) && conf.Get(CFG_SEQUENCE_ID).IsString()) {
std::string sequenceId = conf.Get(CFG_SEQUENCE_ID).ToString().Utf8Value();
pulsar_message_set_sequence_id(cMessage, std::strtoll(sequenceId.c_str(), 0, 10));
}

if (conf.Has(CFG_PARTITION_KEY) && conf.Get(CFG_PARTITION_KEY).IsString()) {
Napi::String partitionKey = conf.Get(CFG_PARTITION_KEY).ToString();
pulsar_message_set_partition_key(cMessage, partitionKey.Utf8Value().c_str());
Expand Down
10 changes: 9 additions & 1 deletion src/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "Message.h"
#include <pulsar/c/result.h>
#include <memory>
#include <string>
Napi::FunctionReference Producer::constructor;

void Producer::Init(Napi::Env env, Napi::Object exports) {
Expand All @@ -30,7 +31,8 @@ void Producer::Init(Napi::Env env, Napi::Object exports) {
Napi::Function func =
DefineClass(env, "Producer",
{InstanceMethod("send", &Producer::Send), InstanceMethod("flush", &Producer::Flush),
InstanceMethod("close", &Producer::Close)});
InstanceMethod("close", &Producer::Close),
InstanceMethod("getLastSequenceId", &Producer::GetLastSequenceId)});

constructor = Napi::Persistent(func);
constructor.SuppressDestruct();
Expand Down Expand Up @@ -183,4 +185,10 @@ Napi::Value Producer::Close(const Napi::CallbackInfo &info) {
return deferred.Promise();
}

Napi::Value Producer::GetLastSequenceId(const Napi::CallbackInfo &info) {
Napi::Env env = info.Env();
int64_t lastSequenceId = pulsar_producer_get_last_sequence_id(this->cProducer);
return Napi::String::New(env, std::to_string(lastSequenceId));
}

Producer::~Producer() { pulsar_producer_free(this->cProducer); }
1 change: 1 addition & 0 deletions src/Producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Producer : public Napi::ObjectWrap<Producer> {
Napi::Value Send(const Napi::CallbackInfo &info);
Napi::Value Flush(const Napi::CallbackInfo &info);
Napi::Value Close(const Napi::CallbackInfo &info);
Napi::Value GetLastSequenceId(const Napi::CallbackInfo &info);
};

#endif
7 changes: 7 additions & 0 deletions src/ProducerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "ProducerConfig.h"
#include <map>
#include <string>

static const std::string CFG_TOPIC = "topic";
static const std::string CFG_PRODUCER_NAME = "producerName";
Expand Down Expand Up @@ -78,6 +79,12 @@ ProducerConfig::ProducerConfig(const Napi::Object& producerConfig) : topic("") {
pulsar_producer_configuration_set_initial_sequence_id(this->cProducerConfig, initialSequenceId);
}

if (producerConfig.Has(CFG_INIT_SEQUENCE_ID) && producerConfig.Get(CFG_INIT_SEQUENCE_ID).IsString()) {
std::string initialSequenceId = producerConfig.Get(CFG_INIT_SEQUENCE_ID).ToString().Utf8Value();
pulsar_producer_configuration_set_initial_sequence_id(this->cProducerConfig,
std::strtoll(initialSequenceId.c_str(), 0, 10));
}

if (producerConfig.Has(CFG_MAX_PENDING) && producerConfig.Get(CFG_MAX_PENDING).IsNumber()) {
int32_t maxPendingMessages = producerConfig.Get(CFG_MAX_PENDING).ToNumber().Int32Value();
if (maxPendingMessages > 0) {
Expand Down
48 changes: 48 additions & 0 deletions tests/producer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,54 @@ const Pulsar = require('../index.js');
batchingEnabled: true,
})).rejects.toThrow('Failed to create producer: ConnectError');
});

test('Sequence ID as Number', async () => {
const producer = await client.createProducer({
topic: 'persistent://public/default/sequence-id-number',
initialSequenceId: 100,
});

expect(producer.getLastSequenceId()).toEqual('100');

await producer.send({
data: Buffer.from('testing'),
});
await producer.flush();
expect(producer.getLastSequenceId()).toEqual('101');

await producer.send({
sequenceId: 105,
data: Buffer.from('testing'),
});
await producer.flush();
expect(producer.getLastSequenceId()).toEqual('105');

await producer.close();
});

test('Sequence ID as String', async () => {
const producer = await client.createProducer({
topic: 'persistent://public/default/sequence-id-string',
initialSequenceId: '100',
});

expect(producer.getLastSequenceId()).toEqual('100');

await producer.send({
data: Buffer.from('testing'),
});
await producer.flush();
expect(producer.getLastSequenceId()).toEqual('101');

await producer.send({
sequenceId: '105',
data: Buffer.from('testing'),
});
await producer.flush();
expect(producer.getLastSequenceId()).toEqual('105');

await producer.close();
});
});
});
})();

0 comments on commit 3615270

Please sign in to comment.