Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 141 additions & 51 deletions lua-mosquitto.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static int mosq_new(lua_State *L)
return luaL_error(L, strerror(errno));
}

ctx->L = L;
ctx->L = NULL;
ctx__on_init(ctx);

luaL_getmetatable(L, MOSQ_META_CTX);
Expand Down Expand Up @@ -422,11 +422,13 @@ static int mosq_loop(lua_State *L, bool forever)
int timeout = luaL_optinteger(L, 2, -1);
int max_packets = luaL_optinteger(L, 3, 1);
int rc;
ctx->L = L;
if (forever) {
rc = mosquitto_loop_forever(ctx->mosq, timeout, max_packets);
} else {
rc = mosquitto_loop(ctx->mosq, timeout, max_packets);
}
ctx->L = NULL;
return mosq__pstatus(L, rc);
}

Expand All @@ -443,8 +445,10 @@ static int ctx_loop_forever(lua_State *L)
static int ctx_loop_start(lua_State *L)
{
ctx_t *ctx = ctx_check(L, 1);
int rc;

int rc = mosquitto_loop_start(ctx->mosq);
ctx->L = L;
rc = mosquitto_loop_start(ctx->mosq);
return mosq__pstatus(L, rc);
}

Expand All @@ -454,6 +458,7 @@ static int ctx_loop_stop(lua_State *L)
bool force = lua_toboolean(L, 2);

int rc = mosquitto_loop_stop(ctx->mosq, force);
ctx->L = NULL;
return mosq__pstatus(L, rc);
}

Expand All @@ -478,25 +483,34 @@ static int ctx_loop_read(lua_State *L)
{
ctx_t *ctx = ctx_check(L, 1);
int max_packets = luaL_optinteger(L, 2, 1);
int rc;

int rc = mosquitto_loop_read(ctx->mosq, max_packets);
ctx->L = L;
rc = mosquitto_loop_read(ctx->mosq, max_packets);
ctx->L = NULL;
return mosq__pstatus(L, rc);
}

static int ctx_loop_write(lua_State *L)
{
ctx_t *ctx = ctx_check(L, 1);
int max_packets = luaL_optinteger(L, 2, 1);
int rc;

int rc = mosquitto_loop_write(ctx->mosq, max_packets);
ctx->L = L;
rc = mosquitto_loop_write(ctx->mosq, max_packets);
ctx->L = NULL;
return mosq__pstatus(L, rc);
}

static int ctx_loop_misc(lua_State *L)
{
ctx_t *ctx = ctx_check(L, 1);
int rc;

int rc = mosquitto_loop_misc(ctx->mosq);
ctx->L = L;
rc = mosquitto_loop_misc(ctx->mosq);
ctx->L = NULL;
return mosq__pstatus(L, rc);
}

Expand All @@ -508,12 +522,9 @@ static int ctx_want_write(lua_State *L)
return 1;
}

static void ctx_on_connect(
struct mosquitto *mosq,
void *obj,
int rc)
{
ctx_t *ctx = obj;
static int ctx_on_connect_safe(lua_State *L) {
int ref = lua_tointeger(L, 1);
int rc = lua_tointeger(L, 2);
bool success = false;
char *str = "reserved for future use";

Expand Down Expand Up @@ -548,22 +559,37 @@ static void ctx_on_connect(
break;
}

lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_connect);
lua_rawgeti(L, LUA_REGISTRYINDEX, ref);

lua_pushboolean(ctx->L, success);
lua_pushinteger(ctx->L, rc);
lua_pushstring(ctx->L, str);
lua_pushboolean(L, success);
lua_pushinteger(L, rc);
lua_pushstring(L, str);

lua_call(ctx->L, 3, 0);
}
lua_call(L, 3, 0);

return 0;
}

static void ctx_on_disconnect(
static void ctx_on_connect(
struct mosquitto *mosq,
void *obj,
int rc)
{
ctx_t *ctx = obj;
lua_State *L = ctx->L;
lua_pushcfunction(L, ctx_on_connect_safe);
lua_pushinteger(L, ctx->on_connect);
lua_pushinteger(L, rc);
if (lua_pcall(L, 2, 0, 0)) {
/* pop error message */
lua_pop(L, 1);
}
}


static int ctx_on_disconnect_safe(lua_State *L) {
int ref = lua_tointeger(L, 1);
int rc = lua_tointeger(L, 2);
bool success = true;
char *str = "client-initiated disconnect";

Expand All @@ -572,13 +598,31 @@ static void ctx_on_disconnect(
str = "unexpected disconnect";
}

lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_disconnect);
lua_rawgeti(L, LUA_REGISTRYINDEX, ref);

lua_pushboolean(L, success);
lua_pushinteger(L, rc);
lua_pushstring(L, str);

lua_pushboolean(ctx->L, success);
lua_pushinteger(ctx->L, rc);
lua_pushstring(ctx->L, str);
lua_call(L, 3, 0);

lua_call(ctx->L, 3, 0);
return 0;
}

static void ctx_on_disconnect(
struct mosquitto *mosq,
void *obj,
int rc)
{
ctx_t *ctx = obj;
lua_State *L = ctx->L;
lua_pushcfunction(L, ctx_on_disconnect_safe);
lua_pushinteger(L, ctx->on_disconnect);
lua_pushinteger(L, rc);
if (lua_pcall(L, 2, 0, 0)) {
/* pop error message */
lua_pop(L, 1);
}
}

static void ctx_on_publish(
Expand All @@ -587,10 +631,31 @@ static void ctx_on_publish(
int mid)
{
ctx_t *ctx = obj;
lua_State *L = ctx->L;
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->on_publish);
lua_pushinteger(L, mid);
if (lua_pcall(L, 1, 0, 0)) {
/* pop error message */
lua_pop(L, 1);
}
}

static int ctx_on_message_safe(lua_State *L) {
int ref = lua_tointeger(L, 1);
const struct mosquitto_message *msg = lua_touserdata(L, 2);

lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_publish);
lua_pushinteger(ctx->L, mid);
lua_call(ctx->L, 1, 0);
/* push registered Lua callback function onto the stack */
lua_rawgeti(L, LUA_REGISTRYINDEX, ref);
/* push function args */
lua_pushinteger(L, msg->mid);
lua_pushstring(L, msg->topic);
lua_pushlstring(L, msg->payload, msg->payloadlen);
lua_pushinteger(L, msg->qos);
lua_pushboolean(L, msg->retain);

lua_call(L, 5, 0); /* args: mid, topic, payload, qos, retain */

return 0;
}

static void ctx_on_message(
Expand All @@ -599,17 +664,14 @@ static void ctx_on_message(
const struct mosquitto_message *msg)
{
ctx_t *ctx = obj;

/* push registered Lua callback function onto the stack */
lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_message);
/* push function args */
lua_pushinteger(ctx->L, msg->mid);
lua_pushstring(ctx->L, msg->topic);
lua_pushlstring(ctx->L, msg->payload, msg->payloadlen);
lua_pushinteger(ctx->L, msg->qos);
lua_pushboolean(ctx->L, msg->retain);

lua_call(ctx->L, 5, 0); /* args: mid, topic, payload, qos, retain */
lua_State *L = ctx->L;
lua_pushcfunction(L, ctx_on_message_safe);
lua_pushinteger(L, ctx->on_message);
lua_pushlightuserdata(L, (void*)msg);
if (lua_pcall(L, 2, 0, 0)) {
/* pop error message */
lua_pop(L, 1);
}
}

static void ctx_on_subscribe(
Expand All @@ -620,16 +682,24 @@ static void ctx_on_subscribe(
const int *granted_qos)
{
ctx_t *ctx = obj;
lua_State *L = ctx->L;
int i;

lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_subscribe);
lua_pushinteger(ctx->L, mid);
if (!lua_checkstack(L, qos_count + 2)) {
/* can't allocate enough stack space */
return;
}

lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->on_subscribe);
lua_pushinteger(L, mid);
for (i = 0; i < qos_count; i++) {
lua_pushinteger(ctx->L, granted_qos[i]);
lua_pushinteger(L, granted_qos[i]);
}

lua_call(ctx->L, qos_count + 1, 0);
if (lua_pcall(L, qos_count + 1, 0, 0)) {
/* pop error message */
lua_pop(L, 1);
}
}

static void ctx_on_unsubscribe(
Expand All @@ -638,10 +708,28 @@ static void ctx_on_unsubscribe(
int mid)
{
ctx_t *ctx = obj;
lua_State *L = ctx->L;
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->on_unsubscribe);
lua_pushinteger(L, mid);
if (lua_pcall(L, 1, 0, 0)) {
/* pop error message */
lua_pop(L, 1);
}
}

static int ctx_on_log_safe(lua_State *L) {
int ref = lua_tointeger(L, 1);
int level = lua_tointeger(L, 2);
const char *str = lua_touserdata(L, 3);

lua_rawgeti(L, LUA_REGISTRYINDEX, ref);

lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_unsubscribe);
lua_pushinteger(ctx->L, mid);
lua_call(ctx->L, 1, 0);
lua_pushinteger(L, level);
lua_pushstring(L, str);

lua_call(L, 2, 0);

return 0;
}

static void ctx_on_log(
Expand All @@ -651,13 +739,15 @@ static void ctx_on_log(
const char *str)
{
ctx_t *ctx = obj;

lua_rawgeti(ctx->L, LUA_REGISTRYINDEX, ctx->on_log);

lua_pushinteger(ctx->L, level);
lua_pushstring(ctx->L, str);

lua_call(ctx->L, 2, 0);
lua_State *L = ctx->L;
lua_pushcfunction(L, ctx_on_log_safe);
lua_pushinteger(L, ctx->on_log);
lua_pushinteger(L, level);
lua_pushlightuserdata(L, (void*)str);
if (lua_pcall(L, 3, 0, 0)) {
/* pop error message */
lua_pop(L, 1);
}
}

static int callback_type_from_string(const char *);
Expand Down