Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharing a working demo of JSON value serialization in AVRO using C language #60

Closed
hongbo-miao opened this issue Aug 10, 2024 · 1 comment

Comments

@hongbo-miao
Copy link

hongbo-miao commented Aug 10, 2024

Introduction

The demo in this repo is serializing a string value, but in real world, it is more common to serialize JSON value.
I saw many people including me met issues, and took me some time to succeed.
So just want to share a working demo of JSON value serialization in AVRO using C language. ☺️

Set Up

I registered schema at Confluent Schema Registry by

curl --location 'https://hm-confluent-schema-registry.internal.hongbomiao.com/subjects/production.iot.device.avro-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{
    "schema": "{\"type\": \"record\", \"name\": \"hongbo_test\", \"fields\":[{ \"name\": \"status\", \"type\": \"string\"},{ \"name\": \"location\", \"type\": \"string\"},{ \"name\": \"type\", \"type\": \"string\"},{ \"name\": \"temperature\", \"type\": \"long\"},{ \"name\": \"humidity\", \"type\": \"double\"},{ \"name\": \"battery\", \"type\": \"long\"},{ \"name\": \"signal_strength\", \"type\": \"long\"},{ \"name\": \"mode\", \"type\": \"string\"},{ \"name\": \"active\", \"type\": \"boolean\"}]}"
}'

Code

main.c

#include <avro.h>
#include <glib.h>
#include <librdkafka/rdkafka.h>
#include <libserdes/serdes-avro.h>
#include <libserdes/serdes.h>
#include <signal.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include "config.c"

#define ARR_SIZE(arr) (sizeof((arr)) / sizeof((arr[0])))

static volatile bool is_running = true;

static void delivery_report(rd_kafka_t *kafka_handle,
                            const rd_kafka_message_t *rkmessage, void *opaque) {
  if (rkmessage->err) {
    g_error("Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
  }
}

void signal_handler(int signal) {
  if (signal == SIGINT || signal == SIGTERM) {
    is_running = false;
  }
}

int main(int argc, char **argv) {
  rd_kafka_t *producer;
  rd_kafka_conf_t *conf;
  serdes_conf_t *serdes_conf;
  serdes_t *serdes;
  char errstr[512];

  if (argc != 2) {
    g_error("Usage: %s <config.ini>", argv[0]);
    return 1;
  }

  const char *config_file = argv[1];

  g_autoptr(GError) error = NULL;
  g_autoptr(GKeyFile) key_file = g_key_file_new();
  if (!g_key_file_load_from_file(key_file, config_file, G_KEY_FILE_NONE,
                                 &error)) {
    g_error("Error loading config file: %s", error->message);
    return 1;
  }

  conf = rd_kafka_conf_new();
  load_config_group(conf, key_file, "default");

  rd_kafka_conf_set(conf, "queue.buffering.max.messages", "10000000", NULL, 0);
  rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "10485760", NULL, 0);
  rd_kafka_conf_set(conf, "batch.size", "65536", NULL, 0);
  rd_kafka_conf_set(conf, "linger.ms", "5", NULL, 0);
  rd_kafka_conf_set_dr_msg_cb(conf, delivery_report);

  producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
  if (!producer) {
    g_error("Failed to create new producer: %s", errstr);
    return 1;
  }

  signal(SIGINT, signal_handler);
  signal(SIGTERM, signal_handler);

  serdes_conf = serdes_conf_new(
      NULL, 0, "schema.registry.url",
      "https://hm-confluent-schema-registry.internal.hongbomiao.com", NULL);

  serdes = serdes_new(serdes_conf, errstr, sizeof(errstr));
  if (!serdes) {
    g_error("Failed to create serdes instance: %s", errstr);
    return 1;
  }

  const char *topic = "production.iot.device.json";
  const char *schema_name = "production.iot.device.json-value";
  serdes_schema_t *serdes_schema =
      serdes_schema_get(serdes, schema_name, -1, errstr, sizeof(errstr));
  if (!serdes_schema) {
    g_error("Failed to retrieve AVRO schema: %s", errstr);
    return 1;
  }

  const char *device_ids[6] = {"device1", "device2", "device3",
                               "device4", "device5", "device6"};
  const char *status_list[3] = {"online", "offline", "maintenance"};
  const char *locations[3] = {"locationA", "locationB", "locationC"};
  const char *types[3] = {"type1", "type2", "type3"};

  srandom(time(NULL));  // Seed the random number generator

  while (is_running) {
    const char *key = device_ids[random() % ARR_SIZE(device_ids)];

    const char *status = status_list[random() % ARR_SIZE(status_list)];
    const char *location = locations[random() % ARR_SIZE(locations)];
    const char *type = types[random() % ARR_SIZE(types)];
    double temperature = ((double)random() / RAND_MAX) * 100.0 - 50.0;
    double humidity = ((double)random() / RAND_MAX);
    int battery = random() % 101;
    int signal_strength = random() % 101;
    const char *mode = (random() % 2) ? "manual" : "auto";
    bool active = (random() % 2);

    avro_schema_t schema = serdes_schema_avro(serdes_schema);
    avro_value_iface_t *record_class =
        avro_generic_class_from_schema(schema);

    avro_value_t record;
    avro_generic_value_new(record_class, &record);

    avro_value_t field;
    if (avro_value_get_by_name(&record, "status", &field, NULL) == 0) {
      avro_value_set_string(&field, status);
    }
    if (avro_value_get_by_name(&record, "location", &field, NULL) == 0) {
      avro_value_set_string(&field, location);
    }
    if (avro_value_get_by_name(&record, "type", &field, NULL) == 0) {
      avro_value_set_string(&field, type);
    }
    if (avro_value_get_by_name(&record, "temperature", &field, NULL) == 0) {
      avro_value_set_long(&field, temperature);
    }
    if (avro_value_get_by_name(&record, "humidity", &field, NULL) == 0) {
      avro_value_set_double(&field, humidity);
    }
    if (avro_value_get_by_name(&record, "battery", &field, NULL) == 0) {
      avro_value_set_long(&field, battery);
    }
    if (avro_value_get_by_name(&record, "signal_strength", &field, NULL) == 0) {
      avro_value_set_long(&field, signal_strength);
    }
    if (avro_value_get_by_name(&record, "mode", &field, NULL) == 0) {
      avro_value_set_string(&field, mode);
    }
    if (avro_value_get_by_name(&record, "active", &field, NULL) == 0) {
      avro_value_set_boolean(&field, active);
    }

    void *avro_payload = NULL;
    size_t avro_size;
    serdes_err_t serr = serdes_schema_serialize_avro(
        serdes_schema, &record, &avro_payload, &avro_size, errstr, sizeof(errstr));
    if (serr != SERDES_ERR_OK) {
      g_error("Failed to serialize data: %s", serdes_err2str(serr));
      return 1;
    }

    rd_kafka_resp_err_t err;
    err = rd_kafka_producev(producer, RD_KAFKA_V_TOPIC(topic),
                            RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                            RD_KAFKA_V_KEY((void *)key, strlen(key)),
                            RD_KAFKA_V_VALUE(avro_payload, avro_size),
                            RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END);

    if (err) {
      g_error("Failed to produce to topic %s: %s", topic,
              rd_kafka_err2str(err));
      return 1;
    }

    free(avro_payload);
    avro_value_decref(&record);
    rd_kafka_poll(producer, 0);
    g_usleep(5);  // μs
  }

  g_message("Flushing final messages ...");
  rd_kafka_flush(producer, 10 * 1000);

  if (rd_kafka_outq_len(producer) > 0) {
    g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));
  }

  g_message("Producer stopped.");
  rd_kafka_destroy(producer);
  serdes_schema_destroy(serdes_schema);
  serdes_destroy(serdes);
  return 0;
}

config.c

#include <glib.h>

static void load_config_group(rd_kafka_conf_t *conf, GKeyFile *key_file,
                              const char *group) {
  char errstr[512];
  g_autoptr(GError) error = NULL;

  gchar **ptr = g_key_file_get_keys(key_file, group, NULL, &error);
  if (error) {
    g_error("%s", error->message);
    exit(1);
  }

  while (*ptr) {
    const char *key = *ptr;
    g_autofree gchar *value =
        g_key_file_get_string(key_file, group, key, &error);

    if (error) {
      g_error("Reading key: %s", error->message);
      exit(1);
    }

    if (rd_kafka_conf_set(conf, key, value, errstr, sizeof(errstr)) !=
        RD_KAFKA_CONF_OK) {
      g_error("%s", errstr);
      exit(1);
    }

    ptr++;
  }
}

config.ini

[default]
bootstrap.servers=b-1.productioniotkafka.xxxxxx.xxx.kafka.us-west-2.amazonaws.com:9096,b-2.productioniotkafka.xxxxxx.xxx.kafka.us-west-2.amazonaws.com:9096,b-3.productioniotkafka.xxxxxx.xxx.kafka.us-west-2.amazonaws.com:9096
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.username=iot_kafka_producer
sasl.password=xxx
compression.codec=zstd

CMakeList.txt

cmake_minimum_required(VERSION 3.13)
project(producer)

set(CMAKE_C_STANDARD 99)
set(TARGET main)
set(SOURCE_FILES main.c)

find_package(PkgConfig REQUIRED)
pkg_check_modules(glib REQUIRED IMPORTED_TARGET glib-2.0)
pkg_check_modules(rdkafka REQUIRED IMPORTED_TARGET rdkafka)

include_directories(/opt/homebrew/opt/avro-c/include)
include_directories(/opt/homebrew/opt/libserdes/include)

link_directories(/opt/homebrew/opt/avro-c/lib)
link_directories(/opt/homebrew/opt/libserdes/lib)

add_executable(${TARGET} ${SOURCE_FILES})

target_link_libraries(${TARGET} avro serdes PkgConfig::glib PkgConfig::rdkafka)

Note I am on macOS, so I installed

brew install avro-c
brew install libserdes

Result

I deployed open source Redpanda Console and Confluent Schema Registry, you can see Redpanda Console can deserialize successfully:

image

Notes

avro_schema_t schema = serdes_schema_avro(serdes_schema); is important.

Without it, the data will not be serialized correctly:

image

@hongbo-miao
Copy link
Author

hongbo-miao commented Aug 12, 2024

If you needs the value be optional, I posted the details at https://stackoverflow.com/a/78859674/2000548
Hopefully it is helpful ☺️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant