You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
conf->set("metadata.broker.list", bootstrap_servers, errstr);
// Set delivery report callback during producer creation
DeliveryReportCallback delivery_report_cb;
conf->set("dr_cb", &delivery_report_cb, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
delete conf;
return;
}
// Create a handle (either producer or consumer, depending on your use case)
RdKafka::Handle *handle = static_cast<RdKafka::Handle*>(producer);
// Create a topic
RdKafka::Topic *rd_topic = RdKafka::Topic::create(handle, topic, nullptr, errstr);
if (!rd_topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
delete producer;
delete conf;
return;
}
// Produce message
RdKafka::ErrorCode resp = producer->produce(rd_topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(message.c_str()), message.size(), nullptr, 0, nullptr);
if (resp != RdKafka::ERR_NO_ERROR) {
std::cerr << "Failed to produce message: " << RdKafka::err2str(resp) << std::endl;
}
producer->poll(0); // Trigger delivery report callback
delete rd_topic;
delete producer;
delete conf;
}
int main() {
// Set your Kafka broker's bootstrap servers
std::string bootstrap_servers = "localhost:9092";
// Set the Kafka topic you want to produce to
std::string kafka_topic = "Test";
// Set the message you want to produce
std::string kafka_message = "Hello, Kafka!";
// Produce the message
produce_message(bootstrap_servers, kafka_topic, kafka_message);
return 0;
}
I am getting below segmentation error.
main" received signal SIGSEGV, Segmentation fault.
0x00007ffff7f9cc93 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits, std::allocator > const&, RdKafka::Conf*, std::__cxx11::basic_string<char, std::char_traits, std::allocator >&)
I was using below command.
g++ -g -o main test1.cpp -lrdkafka++
Please point the problem in the code or let me know how can I setup C++ Kafka consumer and producer using librdkafka.
Thanks in advance.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
#include
#include <librdkafka/rdkafkacpp.h>
class DeliveryReportCallback : public RdKafka::DeliveryReportCb {
public:
void dr_cb(RdKafka::Message &message) override {
if (message.err()) {
std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
} else {
std::cout << "Message delivered to topic " << message.topic_name() << " [" << message.partition() << "]" << std::endl;
}
}
};
void produce_message(const std::string& bootstrap_servers, const std::string& topic, const std::string& message) {
std::string errstr;
}
int main() {
// Set your Kafka broker's bootstrap servers
std::string bootstrap_servers = "localhost:9092";
}
I am getting below segmentation error.
main" received signal SIGSEGV, Segmentation fault.
0x00007ffff7f9cc93 in RdKafka::Topic::create(RdKafka::Handle*, std::__cxx11::basic_string<char, std::char_traits, std::allocator > const&, RdKafka::Conf*, std::__cxx11::basic_string<char, std::char_traits, std::allocator >&)
I was using below command.
g++ -g -o main test1.cpp -lrdkafka++
Please point the problem in the code or let me know how can I setup C++ Kafka consumer and producer using librdkafka.
Thanks in advance.
Beta Was this translation helpful? Give feedback.
All reactions