From e1693bd05ea58ffc0b056c934019491c6234a831 Mon Sep 17 00:00:00 2001 From: JC Wang Date: Mon, 11 Nov 2024 22:49:32 +0800 Subject: [PATCH] Support server mode --- CHANGELOG.md | 24 +++ README.md | 8 +- bench/basic/bench-boostmidx.cpp | 4 +- bench/basic/bench-crossdb.c | 16 +- bench/basic/bench-sqlite.c | 2 +- bench/basic/bench-stlmap.cpp | 2 +- bench/basic/bench.h | 41 ++++- examples/c/example.c | 3 +- include/crossdb.h | 10 +- src/admin/xdb_shell.c | 84 +++++---- src/admin/xdb_shell.h | 2 +- src/core/xdb_cfg.h | 9 +- src/core/xdb_common.h | 9 +- src/core/xdb_conn.c | 2 +- src/core/xdb_conn.h | 5 +- src/core/xdb_crud.c | 33 ++-- src/core/xdb_db.c | 36 ++-- src/core/xdb_db.h | 1 + src/core/xdb_sql.c | 68 +++++++- src/core/xdb_sysdb.c | 28 +++ src/core/xdb_table.c | 13 +- src/core/xdb_trans.c | 9 +- src/core/xdb_trans.h | 44 +++++ src/core/xdb_wal.c | 16 +- src/core/xdb_wal.h | 23 ++- src/crossdb.c | 4 + src/lib/xdb_lib.c | 39 ++++- src/lib/xdb_lib.h | 36 ++-- src/lib/xdb_sock.c | 75 +++++++- src/lib/xdb_thread.c | 26 ++- src/parser/xdb_parser.c | 23 ++- src/parser/xdb_parser_db.c | 16 +- src/parser/xdb_parser_svr.c | 5 +- src/parser/xdb_stmt.h | 38 ++++- src/server/xdb_client.c | 223 ++++++++++++++++++++++++ src/server/xdb_server.c | 293 ++++++++++++++++++++++++++++++++ src/server/xdb_server.h | 35 ++++ src/xdb-cli.c | 30 ++-- winbuild.cmd | 2 +- 39 files changed, 1168 insertions(+), 169 deletions(-) create mode 100644 src/server/xdb_client.c create mode 100644 src/server/xdb_server.c create mode 100644 src/server/xdb_server.h diff --git a/CHANGELOG.md b/CHANGELOG.md index e68e7da..8ca648b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,28 @@ --> +## 0.11.0 (2024-11-11) + +**Features** + +- Support data types: `BOOL`, `TIMESTAMP` +- Support SQL statements: `CREATE SERVER` `DROP SERVER` `SHOW SERVERS` +- Support APIs: `xdb_connect` +- Support embedded SERVER mode +- Support `xdb-cli` standalone server mode +- Support telnet connection + +**Improvements** + +**Test** + +**Bug Fixes** + +- INSERT without column list will set all columns to NULL +- WAL flush with wrong address and range +- Crash when table drop during flush + + ## 0.10.0 (2024-11-01) **Features** @@ -46,6 +68,7 @@ - Create duplicate database doesn't report error sometimes + ## 0.9.0 (2024-10-11) **Features** @@ -75,6 +98,7 @@ - Update transaction crash issue - SQL syntax error + ## 0.8.0 (2024-09-03) **Features** diff --git a/README.md b/README.md index 5313b18..2c6e76a 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ It is designed for high-performance scenarios where the main memory can hold the - Supports the standard RDBMS model. - Supports standard SQL and many extensions from MySQL. - Supports multiple databases. -- Supports embedded and client-server modes (TBD). +- Supports embedded and client-server modes. - Supports primary keys and multiple secondary indexes. - Supports HASH and RBTREE (TBD) indexes. - Supports multi-column indexes. @@ -128,3 +128,9 @@ https://crossdb.org/client/api-c/ ### Tutorial https://crossdb.org/get-started/tutorial/ + +### CrossDB Server + +https://crossdb.org/develop/server/ + +https://crossdb.org/admin/shell/#connect-to-crossdb-server diff --git a/bench/basic/bench-boostmidx.cpp b/bench/basic/bench-boostmidx.cpp index 7457031..88916f7 100644 --- a/bench/basic/bench-boostmidx.cpp +++ b/bench/basic/bench-boostmidx.cpp @@ -7,8 +7,8 @@ #include #define BENCH_DBNAME "Boost" -#define LKUP_COUNT 10000000 #define TEST_NAME(i) i?"Hash":"Order" +int LKUP_COUNT = 10000000; using namespace std; @@ -197,4 +197,4 @@ bool bench_stmt_del_byid (void *pStmt, int id) bool ok = find_id_hmap.erase(id); pthread_rwlock_unlock(&stu_tbl_lock); return ok; -} \ No newline at end of file +} diff --git a/bench/basic/bench-crossdb.c b/bench/basic/bench-crossdb.c index b7ef4f0..4a3bb70 100644 --- a/bench/basic/bench-crossdb.c +++ b/bench/basic/bench-crossdb.c @@ -1,13 +1,22 @@ #include #define BENCH_DBNAME "CrossDB" -#define LKUP_COUNT 10000000 + +int LKUP_COUNT = 10000000; #include "bench.h" void* bench_open (const char *db) { - xdb_conn_t* pConn = xdb_open (db); + xdb_conn_t* pConn; + if (!s_bench_svr) { + pConn = xdb_open (db); + } else { + pConn = xdb_connect (NULL, NULL, NULL, NULL, 7777); + xdb_bexec (pConn, "CREATE DATABASE school ENGINE=MEMORY"); + xdb_bexec (pConn, "USE school"); + LKUP_COUNT = 100000; + } XDB_CHECK (NULL != pConn, printf ("Can't open connection:\n"); return NULL;); return pConn; } @@ -44,6 +53,9 @@ bool bench_sql_get_byid (void *pConn, const char *sql, int id, stu_callback call bool bench_sql_updAge_byid (void *pConn, const char *sql, int id, int age) { xdb_res_t *pRes = xdb_bexec (pConn, sql, age, id); + if (pRes->affected_rows != 1) { + printf ("wrong\n"); + } return 1 == pRes->affected_rows; } diff --git a/bench/basic/bench-sqlite.c b/bench/basic/bench-sqlite.c index a56558c..85ccc05 100644 --- a/bench/basic/bench-sqlite.c +++ b/bench/basic/bench-sqlite.c @@ -2,7 +2,7 @@ #include #define BENCH_DBNAME "SQLite" -#define LKUP_COUNT 1000000 +int LKUP_COUNT = 1000000; #include "bench.h" diff --git a/bench/basic/bench-stlmap.cpp b/bench/basic/bench-stlmap.cpp index bd4e86a..3923386 100644 --- a/bench/basic/bench-stlmap.cpp +++ b/bench/basic/bench-stlmap.cpp @@ -4,8 +4,8 @@ #include #define BENCH_DBNAME "STL" -#define LKUP_COUNT 10000000 #define TEST_NAME(i) i?"HashMap":"Map" +int LKUP_COUNT = 10000000; using namespace std; diff --git a/bench/basic/bench.h b/bench/basic/bench.h index af9b0eb..d33c7ba 100644 --- a/bench/basic/bench.h +++ b/bench/basic/bench.h @@ -8,8 +8,9 @@ #include #include -#define SQL_LKUP_COUNT LKUP_COUNT/5 -#define UPD_COUNT LKUP_COUNT/10 +extern int LKUP_COUNT; +int SQL_LKUP_COUNT; +int UPD_COUNT; #define BENCH_CHECK(expr, action...) if (!(expr)) { action; } @@ -128,6 +129,8 @@ void stu_count_cb (void *pArg, int id, string &name, int age, string &cls, int s #endif +bool s_bench_svr = false; + #define BENCH_SQL_CREATE "CREATE TABLE student (id INT PRIMARY KEY, name CHAR(16), age INT, class CHAR(16), score INT)" #define BENCH_SQL_DROP "DROP TABLE IF EXISTS student" #define BENCH_SQL_INSERT "INSERT INTO student (id,name,age,class,score) VALUES (?,?,?,?,?)" @@ -135,6 +138,10 @@ void stu_count_cb (void *pArg, int id, string &name, int age, string &cls, int s #define BENCH_SQL_UDPAGE_BYID "UPDATE student SET age=? WHERE id=?" #define BENCH_SQL_DEL_BYID "DELETE FROM student WHERE id=?" +#define BENCH_SQL_INSERT_NET "INSERT INTO student (id,name,age,class,score) VALUES (%d,'%s',%d,'%s',%d)" +#define BENCH_SQL_GET_BYID_NET "SELECT * FROM student WHERE id=%d" +#define BENCH_SQL_UDPAGE_BYID_NET "UPDATE student SET age=? WHERE id=%d" +#define BENCH_SQL_DEL_BYID_NET "DELETE FROM student WHERE id=%d" void* bench_open (const char *db); void bench_close (void *pConn); @@ -162,6 +169,10 @@ bool bench_stmt_get_byid (void *pStmt, int id, stu_callback callback, void *pArg void bench_sql_test (void *pConn, int STU_COUNT, bool bRand, bench_result_t *pResult) { bool ok; + const char *sql_insert = !s_bench_svr ? BENCH_SQL_INSERT : BENCH_SQL_INSERT_NET; + const char *sql_getbyid = !s_bench_svr ? BENCH_SQL_GET_BYID : BENCH_SQL_GET_BYID_NET; + const char *sql_upagebyid = !s_bench_svr ? BENCH_SQL_UDPAGE_BYID : BENCH_SQL_UDPAGE_BYID_NET; + const char *sql_delbyid = !s_bench_svr ? BENCH_SQL_DEL_BYID : BENCH_SQL_DEL_BYID_NET; bench_sql (pConn, BENCH_SQL_DROP); bench_sql (pConn, BENCH_SQL_CREATE); @@ -171,7 +182,7 @@ void bench_sql_test (void *pConn, int STU_COUNT, bool bRand, bench_result_t *pRe bench_print ("------------ INSERT %s ------------\n", qps2str(STU_COUNT)); bench_ts_beg(); for (int i = 0; i < STU_COUNT; ++i) { - ok = bench_sql_insert (pConn, BENCH_SQL_INSERT, STU_BASEID+i, STU_NAME(i), STU_AGE(i), STU_CLASS(i), STU_SCORE(i)); + ok = bench_sql_insert (pConn, sql_insert, STU_BASEID+i, STU_NAME(i), STU_AGE(i), STU_CLASS(i), STU_SCORE(i)); BENCH_CHECK (ok, bench_print ("Can't insert student id=%d\n", STU_BASEID+i); return;); } pResult->insert_qps += bench_ts_end (STU_COUNT); @@ -182,7 +193,7 @@ void bench_sql_test (void *pConn, int STU_COUNT, bool bRand, bench_result_t *pRe int count = 0; bench_ts_beg(); for (int i = 0; i < SQL_LKUP_COUNT; ++i) { - ok = bench_sql_get_byid (pConn, BENCH_SQL_GET_BYID, STU_ID(i), stu_count_cb, &count); + ok = bench_sql_get_byid (pConn, sql_getbyid, STU_ID(i), stu_count_cb, &count); BENCH_CHECK (ok, bench_print ("Can't get student id=%d\n", STU_ID(i)); return;); } qps_sum += bench_ts_end (SQL_LKUP_COUNT); @@ -193,7 +204,7 @@ void bench_sql_test (void *pConn, int STU_COUNT, bool bRand, bench_result_t *pRe bench_print ("------------ %s UPDATE %s ------------\n", ORDER_STR(bRand), qps2str(UPD_COUNT)); bench_ts_beg(); for (int i = 0; i < UPD_COUNT; ++i) { - ok = bench_sql_updAge_byid (pConn, BENCH_SQL_UDPAGE_BYID, STU_ID(i), 10+i%20); + ok = bench_sql_updAge_byid (pConn, sql_upagebyid, STU_ID(i), 10+i%20); BENCH_CHECK (ok, bench_print ("Can't update student id=%d\n", STU_ID(i)); return;); } pResult->update_qps += bench_ts_end (UPD_COUNT); @@ -201,7 +212,7 @@ void bench_sql_test (void *pConn, int STU_COUNT, bool bRand, bench_result_t *pRe bench_print ("------------ %s DELETE %s ------------\n", ORDER_STR(bRand), qps2str(STU_COUNT)); bench_ts_beg(); for (int i = 0; i < STU_COUNT; ++i) { - ok = bench_sql_del_byid (pConn, BENCH_SQL_DEL_BYID, STU_ID(i)); + ok = bench_sql_del_byid (pConn, sql_delbyid, STU_ID(i)); BENCH_CHECK (ok, bench_print ("Can't delete student id=%d\n", STU_ID(i)); return;); } pResult->delete_qps += bench_ts_end (STU_COUNT); @@ -297,14 +308,16 @@ int main (int argc, char **argv) const char *db = ":memory:"; if (argc >= 2) { - while ((ch = getopt(argc, argv, "n:r:c:d:l:qjh")) != -1) { + while ((ch = getopt(argc, argv, "n:r:c:d:l:qjhs")) != -1) { switch (ch) { case 'h': printf ("Usage:\n"); printf (" -h show this help\n"); printf (" -n default 1000000\n"); printf (" -r test round, default 1\n"); + printf (" -d test on-disk db\n"); printf (" -c bind cpu core\n"); + printf (" -s test in server db mode\n"); printf (" -q quite mode\n"); return -1; case 'n': @@ -325,6 +338,9 @@ int main (int argc, char **argv) case 'j': bCharts = true; break; + case 's': + s_bench_svr = true; + break; case 'q': s_quiet = true; break; @@ -363,6 +379,9 @@ int main (int argc, char **argv) goto error; } + SQL_LKUP_COUNT = LKUP_COUNT/5; + UPD_COUNT = LKUP_COUNT/10; + bench_result_t result[4]; memset (&result, 0, sizeof(result)); @@ -372,12 +391,16 @@ int main (int argc, char **argv) bench_print ("\n******************** %10s Test *********************\n", "Sequential"); bench_sql_test (pConn, STU_COUNT, false, &result[0]); - bench_stmt_test (pConn, STU_COUNT, false, &result[1]); + if (!s_bench_svr) { + bench_stmt_test (pConn, STU_COUNT, false, &result[1]); + } bench_print ("\n\n********************* %10s Test *********************\n", "Random"); bench_sql_test (pConn, STU_COUNT, true, &result[2]); - bench_stmt_test (pConn, STU_COUNT, true, &result[3]); + if (!s_bench_svr) { + bench_stmt_test (pConn, STU_COUNT, true, &result[3]); + } } for (int i = 0; i < 4; ++i) { diff --git a/examples/c/example.c b/examples/c/example.c index 41aa9cd..deb5275 100644 --- a/examples/c/example.c +++ b/examples/c/example.c @@ -5,7 +5,8 @@ int main (int argc, char **argv) xdb_res_t *pRes; xdb_row_t *pRow; - xdb_conn_t *pConn = xdb_open (argc > 1 ? argv[1] : ":memory:"); + //xdb_conn_t *pConn = xdb_open (argc > 1 ? argv[1] : ":memory:"); + xdb_conn_t *pConn = xdb_connect (NULL, NULL, NULL, "memory", 7777); XDB_CHECK (NULL != pConn, printf ("failed to create DB\n"); return -1;); // Create Table diff --git a/include/crossdb.h b/include/crossdb.h index 7ee9217..1973f09 100644 --- a/include/crossdb.h +++ b/include/crossdb.h @@ -69,12 +69,7 @@ typedef enum { XDB_TYPE_VCHAR = 14, // varied-length string(at most 65535 byte) XDB_TYPE_VBINARY = 15, // varied-length binary(at most 65535 byte) XDB_TYPE_BOOL = 16, - // MAC,IPv4,IPv6,CIDR - //XDB_TYPE_DECIMAL = 16, // TBD decimal - //XDB_TYPE_GEOMETRY = 17, // TBD geometry - //XDB_TYPE_JSON = 18, // TBD json string - //XDB_TYPE_DYNAMIC = 20, - XDB_TYPE_MAX = 21 + XDB_TYPE_MAX = 32 } xdb_type_t; @@ -224,6 +219,9 @@ xdb_res_t* xdb_vbexec_cb (xdb_conn_t *pConn, xdb_row_callback callback, void *pArg, const char *sql, va_list ap); #endif +const void * +xdb_poll (xdb_conn_t *pConn, int *pLen, uint32_t timeout); + /************************************** Result diff --git a/src/admin/xdb_shell.c b/src/admin/xdb_shell.c index aa92a27..79036f6 100644 --- a/src/admin/xdb_shell.c +++ b/src/admin/xdb_shell.c @@ -25,7 +25,7 @@ xdb_print_char (FILE *pFile, char ch, int n) { int i; for (i = 0; i < n; ++i) { - fputc (ch, pFile); + xdb_fputc (ch, pFile); } } @@ -63,12 +63,12 @@ xdb_is_sql_complete (char *sql, bool split) XDB_STATIC void xdb_print_table_line (FILE *pFile, int *colLen, int n) { - fputc ('+', pFile); + xdb_fputc ('+', pFile); for (int i = 0; i < n; ++i) { xdb_print_char (pFile, '-', colLen[i]+2); - fputc ('+', pFile); + xdb_fputc ('+', pFile); } - fputc ('\n', pFile); + xdb_fputc ('\n', pFile); } XDB_STATIC void @@ -195,51 +195,51 @@ xdb_fprint_row_table (FILE *pFile, xdb_meta_t *pMeta, xdb_row_t *pRow, int *pCol } for (int n = 0; n < line; ++n) { - fputc ('|', pFile); + xdb_fputc ('|', pFile); for (int i = 0; i < pMeta->col_count; ++i) { int plen = 0; char *str = "", *ch = NULL; - fputc (' ', pFile); + xdb_fputc (' ', pFile); void *pVal = (void*)((uint64_t*)pRow[i]); if (NULL == pVal) { if (0 == n) { - plen = fprintf (pFile, "NULL"); + plen = xdb_fprintf (pFile, "NULL"); } } else { switch (pCol[i]->col_type) { case XDB_TYPE_INT: if (0 == n) { - plen = fprintf (pFile, "%d", *(int32_t*)pVal); + plen = xdb_fprintf (pFile, "%d", *(int32_t*)pVal); } break; case XDB_TYPE_BOOL: if (0 == n) { - plen = fprintf (pFile, "%s", *(int8_t*)pVal?"true":"false"); + plen = xdb_fprintf (pFile, "%s", *(int8_t*)pVal?"true":"false"); } break; case XDB_TYPE_TINYINT: if (0 == n) { - plen = fprintf (pFile, "%d", *(int8_t*)pVal); + plen = xdb_fprintf (pFile, "%d", *(int8_t*)pVal); } break; case XDB_TYPE_SMALLINT: if (0 == n) { - plen = fprintf (pFile, "%d", *(int16_t*)pVal); + plen = xdb_fprintf (pFile, "%d", *(int16_t*)pVal); } break; case XDB_TYPE_BIGINT: if (0 == n) { - plen = fprintf (pFile, "%"PRIi64, *(int64_t*)pVal); + plen = xdb_fprintf (pFile, "%"PRIi64, *(int64_t*)pVal); } break; case XDB_TYPE_FLOAT: if (0 == n) { - plen = fprintf (pFile, "%f", *(float*)pVal); + plen = xdb_fprintf (pFile, "%f", *(float*)pVal); } break; case XDB_TYPE_DOUBLE: if (0 == n) { - plen = fprintf (pFile, "%f", *(double*)pVal); + plen = xdb_fprintf (pFile, "%f", *(double*)pVal); } break; case XDB_TYPE_CHAR: @@ -262,14 +262,14 @@ xdb_fprint_row_table (FILE *pFile, xdb_meta_t *pMeta, xdb_row_t *pRow, int *pCol *ch = '\0'; } } - plen = fprintf (pFile, "%s", str); + plen = xdb_fprintf (pFile, "%s", str); break; case XDB_TYPE_BINARY: case XDB_TYPE_VBINARY: if (0 == n) { - plen += fprintf (pFile, "0x"); + plen += xdb_fprintf (pFile, "0x"); for (int h = 0; h < *(uint16_t*)(pVal-2); ++h) { - plen += fprintf (pFile, "%c%c", s_xdb_hex_2_str[((uint8_t*)pVal)[h]][0], s_xdb_hex_2_str[((uint8_t*)pVal)[h]][1]); + plen += xdb_fprintf (pFile, "%c%c", s_xdb_hex_2_str[((uint8_t*)pVal)[h]][0], s_xdb_hex_2_str[((uint8_t*)pVal)[h]][1]); } } break; @@ -288,7 +288,7 @@ xdb_fprint_row_table (FILE *pFile, xdb_meta_t *pMeta, xdb_row_t *pRow, int *pCol len += sprintf (buf+len, ".%03d", millsec/1000); } } - plen = fprintf (pFile, "%s", buf); + plen = xdb_fprintf (pFile, "%s", buf); } break; } @@ -296,12 +296,12 @@ xdb_fprint_row_table (FILE *pFile, xdb_meta_t *pMeta, xdb_row_t *pRow, int *pCol xdb_print_char (pFile, ' ', pColLen[i] + 1 - plen); - fputc ('|', pFile); + xdb_fputc ('|', pFile); if (ch != NULL) { *ch = '\n'; } } - fputc ('\n', pFile); + xdb_fputc ('\n', pFile); } } @@ -326,14 +326,14 @@ xdb_output_table (xdb_conn_t *pConn, xdb_res_t *pRes) xdb_rewind_result (pRes); xdb_print_table_line (pConn->conn_stdout, colLen, count); - fputc ('|', pConn->conn_stdout); + xdb_fputc ('|', pConn->conn_stdout); for (int i = 0; i < count; ++i) { - fputc (' ', pConn->conn_stdout); - fprintf (pConn->conn_stdout, "%s", pCol[i]->col_name); + xdb_fputc (' ', pConn->conn_stdout); + xdb_fprintf (pConn->conn_stdout, "%s", pCol[i]->col_name); xdb_print_char (pConn->conn_stdout, ' ', colLen[i] + 1 - pCol[i]->col_nmlen); - fputc ('|', pConn->conn_stdout); + xdb_fputc ('|', pConn->conn_stdout); } - fputc ('\n', pConn->conn_stdout); + xdb_fputc ('\n', pConn->conn_stdout); xdb_print_table_line (pConn->conn_stdout, colLen, count); @@ -414,6 +414,27 @@ xdb_shell_add_tbl (xdb_conn_t* pConn, const char *prefix, crossline_completions_ return bFound; } +XDB_STATIC bool +xdb_shell_add_svr (xdb_conn_t* pConn, const char *prefix, crossline_completions_t *pCompletion) +{ + bool bFound = true; + xdb_row_t *pRow; + + xdb_res_t *pRes = xdb_pexec (pConn, "SELECT server FROM system.servers WHERE server='%s'", xdb_curdb(pConn), prefix); + if (0 == pRes->row_count) { + bFound = false; + xdb_free_result (pRes); + pRes = xdb_pexec (pConn, "SELECT server FROM system.servers", xdb_curdb(pConn)); + while (NULL != (pRow = xdb_fetch_row (pRes))) { + if (! strncasecmp(prefix, (char*)pRow[0], strlen(prefix))) { + crossline_completion_add (pCompletion, (char*)pRow[0], NULL); + } + } + } + xdb_free_result (pRes); + return bFound; +} + XDB_STATIC bool xdb_shell_add_idx (xdb_conn_t* pConn, const char *prefix, crossline_completions_t *pCompletion) { @@ -585,7 +606,7 @@ xdb_shell_completion_hook (char const *buf, crossline_completions_t *pCompletion xdb_shell_completion_filter (pConn, pCompletion, &token, token.token, fitlercommands); } } else if (! strcasecmp (token.token, "SHOW")) { - static const char *commands[] = {"DATABASES", "TABLES", "INDEXES", "COLUMNS", "CREATE", NULL}; + static const char *commands[] = {"DATABASES", "TABLES", "INDEXES", "COLUMNS", "SERVERS", "CREATE", NULL}; xdb_next_token (&token); if (sql_add_completion (pCompletion, token.token, commands, NULL) >= 0) { goto exit; @@ -619,7 +640,7 @@ xdb_shell_completion_hook (char const *buf, crossline_completions_t *pCompletion xdb_shell_completion_filter (pConn, pCompletion, &token, token.token, fitlercommands); } } else if (! strcasecmp (token.token, "DROP")) { - static const char *commands[] = {"DATABASE", "TABLE", "INDEX", NULL}; + static const char *commands[] = {"DATABASE", "TABLE", "INDEX", "SERVER", NULL}; xdb_next_token (&token); if (sql_add_completion (pCompletion, token.token, commands, NULL) >= 0) { goto exit; @@ -650,6 +671,11 @@ xdb_shell_completion_hook (char const *buf, crossline_completions_t *pCompletion goto exit; } } + } else if (! strcasecmp (token.token, "SERVER")) { + xdb_next_token (&token); + if (!xdb_shell_add_svr (pConn, token.token, pCompletion)) { + goto exit; + } } } else if (! strcasecmp (token.token, "DESC") || ! strcasecmp (token.token, "DESCRIBE")) { xdb_next_token (&token); @@ -795,11 +821,11 @@ xdb_shell_process (xdb_conn_t *pConn, const char *sql) } XDB_STATIC int -xdb_shell_loop (xdb_conn_t* pConn, const char *prompt, const char *db) +xdb_shell_loop (xdb_conn_t* pConn, const char *prompt, const char *db, bool bQuite) { bool bNewConn = false; - if (isatty(STDIN_FILENO)) { + if (!bQuite && isatty(STDIN_FILENO)) { xdb_print (s_xdb_banner, xdb_version()); crossline_color_set (CROSSLINE_FGCOLOR_YELLOW); diff --git a/src/admin/xdb_shell.h b/src/admin/xdb_shell.h index dc6094b..50083bf 100644 --- a/src/admin/xdb_shell.h +++ b/src/admin/xdb_shell.h @@ -13,7 +13,7 @@ #define __XDB_SHELL_H__ XDB_STATIC int -xdb_shell_loop (xdb_conn_t* pConn, const char *prompt, const char *db); +xdb_shell_loop (xdb_conn_t* pConn, const char *prompt, const char *db, bool bQuite); XDB_STATIC void xdb_output_table (xdb_conn_t *pConn, xdb_res_t *pRes); diff --git a/src/core/xdb_cfg.h b/src/core/xdb_cfg.h index ca850f0..b1b13c1 100644 --- a/src/core/xdb_cfg.h +++ b/src/core/xdb_cfg.h @@ -16,7 +16,7 @@ CrossDB Config ******************************************************************************/ -#define XDB_VERSION "0.10.0" +#define XDB_VERSION "0.11.0" #define XDB_MAX_DB 1024 // at most 4096 #define XDB_MAX_TBL 4095 // per DB @@ -30,9 +30,12 @@ #define XDB_PATH_LEN 512 - #ifndef XDB_ENABLE_SERVER -#define XDB_ENABLE_SERVER 0 +#define XDB_ENABLE_SERVER 1 +#endif + +#ifndef XDB_ENABLE_PUBSUB +#define XDB_ENABLE_PUBSUB 0 #endif #endif // __CROSS_CFG_H__ diff --git a/src/core/xdb_common.h b/src/core/xdb_common.h index 426a8b3..a183ef3 100644 --- a/src/core/xdb_common.h +++ b/src/core/xdb_common.h @@ -78,6 +78,9 @@ typedef int xdb_rowid; #define XDB_CONNCODE(pConn) pConn->conn_res.errcode //#define XDB_LOG_FLAGS (XDB_LOG_DB|XDB_LOG_TBL|XDB_LOG_TRANS|XDB_LOG_WAL) +//#define XDB_LOG_FLAGS XDB_LOG_DB +//#define XDB_LOG_FLAGS XDB_LOG_SVR +//#define XDB_LOG_FLAGS XDB_LOG_PUBSUB #ifndef XDB_LOG_FLAGS #define XDB_LOG_FLAGS 0 #endif @@ -92,6 +95,9 @@ typedef int xdb_rowid; #define XDB_LOG_TRANS (1<<10) #define XDB_LOG_WAL (1<<11) #define XDB_LOG_VDAT (1<<12) +#define XDB_LOG_SVR (1<<13) +#define XDB_LOG_BINLOG (1<<14) +#define XDB_LOG_PUBSUB (1<<15) #define XDB_IS_NOTNULL(pNull,bits) (((uint8_t*)(pNull))[bits>>3] & (1<<(bits&7))) #define XDB_SET_NOTNULL(pNull,bits) (((uint8_t*)(pNull))[bits>>3] |= (1<<(bits&7))) @@ -154,7 +160,7 @@ typedef struct { uint16_t *pFldMap; xdb_rowptr_t *pRowList; xdb_rowptr_t rowlist[XDB_ROWLIST_CNT]; - xdb_bmp_t *pBmp; + xdb_bmp_t *pBmp, bmp; //uint8_t buf[xxx]; // if no lock, then cache result ? } xdb_rowset_t; @@ -167,5 +173,6 @@ int xdb_exit (); static xdb_type_t s_xdb_prompt_type[]; +static bool s_xdb_cli; #endif // __XDB_COMMON_H__ diff --git a/src/core/xdb_conn.c b/src/core/xdb_conn.c index 943bb0a..3973c07 100644 --- a/src/core/xdb_conn.c +++ b/src/core/xdb_conn.c @@ -61,7 +61,7 @@ xdb_close (xdb_conn_t* pConn) xdb_rollback (pConn); - if (pConn->conn_stdout != stdout) { + if ((pConn->conn_stdout != stdout) && ((uintptr_t)pConn->conn_stdout != pConn->sockfd)) { fclose (pConn->conn_stdout); } else if (pConn->sockfd > 0) { xdb_sock_close (pConn->sockfd); diff --git a/src/core/xdb_conn.h b/src/core/xdb_conn.h index 0a1b537..9b6926e 100644 --- a/src/core/xdb_conn.h +++ b/src/core/xdb_conn.h @@ -48,7 +48,7 @@ typedef struct xdb_conn_t { xdb_conn_t *pConn; // pConn must be before conn_res xdb_res_t conn_res; - xdb_msg_t conn_msg; + xdb_msg_t conn_msg; // conn_msg must be after conn_res int sockfd; FILE *conn_stdout; @@ -83,6 +83,9 @@ typedef struct xdb_conn_t { bool conn_client; xdb_format_t res_format; + + char *poll_buf; + uint32_t poll_size; } xdb_conn_t; XDB_STATIC void diff --git a/src/core/xdb_crud.c b/src/core/xdb_crud.c index e9cca8e..496d48b 100644 --- a/src/core/xdb_crud.c +++ b/src/core/xdb_crud.c @@ -1696,10 +1696,9 @@ XDB_STATIC int xdb_sql_scan (xdb_conn_t *pConn, xdb_tblm_t *pTblm, xdb_rowset_t *pRowSet, xdb_reftbl_t *pRefTbl) { if (xdb_likely (pRefTbl->bUseIdx)) { - xdb_bmp_t rows_bmp; if (xdb_unlikely (pRefTbl->or_count > 1)) { - xdb_bmp_init (&rows_bmp); - pRowSet->pBmp = &rows_bmp; + pRowSet->pBmp = &pRowSet->bmp; + xdb_bmp_init (pRowSet->pBmp); } for (int i = 0; i < pRefTbl->or_count; ++i) { xdb_idxfilter_t *pIdxFilter = pRefTbl->or_list[i].pIdxFilter; @@ -1976,7 +1975,13 @@ xdb_queryres_alloc (xdb_conn_t *pConn, xdb_size size) pQueryRes->pStmt = NULL; pQueryRes->buf_len = size; pConn->pQueryRes = pQueryRes; + } else if (pQueryRes->buf_len < size) { + pQueryRes = xdb_realloc (pQueryRes, size); + XDB_EXPECT2 (NULL != pQueryRes); + pQueryRes->buf_len = size; + pConn->pQueryRes = pQueryRes; } + pQueryRes->buf_free = pQueryRes->buf_len - (sizeof (*pQueryRes) + 4); // last 4 is end of row (len = 0) return NULL; @@ -2275,10 +2280,12 @@ xdb_sprint_field (xdb_field_t *pField, void *pRow, char *buf) return len; } -static int -xdb_dbrow_log (xdb_tblm_t *pTblm, uint32_t type, void *pNewRow, void *pOldRow, xdb_setfld_t set_flds[], int set_count) +XDB_STATIC int +xdb_dbrow_log (xdb_tblm_t *pTblm, uint32_t type, void *pNewRow, void *pOldRow, xdb_setfld_t *set_flds, int set_count) { +#if (XDB_ENABLE_PUBSUB == 0) return 0; +#endif int len = 0; char buf[32768]; @@ -2291,7 +2298,7 @@ xdb_dbrow_log (xdb_tblm_t *pTblm, uint32_t type, void *pNewRow, void *pOldRow, x switch (type) { case XDB_TRIG_AFT_INS: - len = sprintf (buf, "INSERT INTO %s (", XDB_OBJ_NAME(pTblm)); + len = sprintf (buf, "INSERT INTO %s.%s (", XDB_OBJ_NAME(pTblm->pDbm), XDB_OBJ_NAME(pTblm)); for (int i = 0; i < pTblm->fld_count; ++i) { pField = &pTblm->pFields[i]; memcpy (buf+len, pField->obj.obj_name, pField->obj.nm_len); @@ -2309,7 +2316,7 @@ xdb_dbrow_log (xdb_tblm_t *pTblm, uint32_t type, void *pNewRow, void *pOldRow, x buf[++len] = '\0'; break; case XDB_TRIG_AFT_UPD: - len = sprintf (buf, "UPDATE %s SET ", XDB_OBJ_NAME(pTblm)); + len = sprintf (buf, "UPDATE %s.%s SET ", XDB_OBJ_NAME(pTblm->pDbm), XDB_OBJ_NAME(pTblm)); for (int i = 0; i < set_count; ++i) { pField = set_flds[i].pField; memcpy (buf+len, pField->obj.obj_name, pField->obj.nm_len); @@ -2323,7 +2330,7 @@ xdb_dbrow_log (xdb_tblm_t *pTblm, uint32_t type, void *pNewRow, void *pOldRow, x // fall through case XDB_TRIG_AFT_DEL: if (XDB_TRIG_AFT_DEL == type) { - len = sprintf (buf, "DELETE FROM %s WHERE ", XDB_OBJ_NAME(pTblm)); + len = sprintf (buf, "DELETE FROM %s.%s WHERE ", XDB_OBJ_NAME(pTblm->pDbm), XDB_OBJ_NAME(pTblm)); } if (pTblm->bPrimary) { xdb_idxm_t *pIdxm = XDB_OBJM_GET(pTblm->idx_objm, 0); @@ -2344,11 +2351,15 @@ xdb_dbrow_log (xdb_tblm_t *pTblm, uint32_t type, void *pNewRow, void *pOldRow, x buf[len++] = '='; len += xdb_sprint_field (pField, (void*)pOldRow, buf+len); } - buf[len++] = ';'; + buf[len++] = '\n'; buf[len++] = '\0'; } - printf ("DBLOG: %s\n", buf); + printf ("DBLOG %d: %s\n", len, buf); +#if (XDB_ENABLE_PUBSUB == 1) + xdb_pub_notify (buf, len); +#endif + return 0; } @@ -2500,7 +2511,7 @@ xdb_row_delete (xdb_conn_t *pConn, xdb_tblm_t *pTblm, xdb_rowid rid, void *pRow, } XDB_STATIC int -xdb_row_update (xdb_conn_t *pConn, xdb_tblm_t *pTblm, xdb_rowid rid, void *pRow, xdb_setfld_t set_flds[], int set_count) +xdb_row_update (xdb_conn_t *pConn, xdb_tblm_t *pTblm, xdb_rowid rid, void *pRow, xdb_setfld_t *set_flds, int set_count) { uint64_t idx_bmp = 0; XDB_BUF_DEF(pUpdRow, 4096); diff --git a/src/core/xdb_db.c b/src/core/xdb_db.c index 3e686f7..af904ba 100644 --- a/src/core/xdb_db.c +++ b/src/core/xdb_db.c @@ -39,13 +39,14 @@ xdb_use_db (xdb_stmt_db_t *pStmt) XDB_STATIC int xdb_open_datadir (xdb_stmt_db_t *pStmt) { - char *datadir = pStmt->db_name; + char datadir[XDB_PATH_LEN + 1];; DIR *d = NULL; struct dirent *dp = NULL; struct stat st; - char dbpath[XDB_PATH_LEN + 1] = {0}; + char dbpath[XDB_PATH_LEN + 256 + 1] = {0}; int rc; + xdb_strcpy (datadir, pStmt->db_name); if(stat(datadir, &st) < 0 || !S_ISDIR(st.st_mode)) { xdb_errlog ("Invalid datadir: %s\n", datadir); return XDB_E_NOTFOUND; @@ -61,6 +62,7 @@ xdb_open_datadir (xdb_stmt_db_t *pStmt) continue; } snprintf (dbpath, sizeof(dbpath), "%s/%s", datadir, dp->d_name); + xdb_dblog ("Open DB '%s'\n", dbpath); // snprintf on Linux will force last '\0', while Windows doesn't dbpath[sizeof(dbpath)-1] = '\0'; rc = stat (dbpath, &st); @@ -97,17 +99,17 @@ XDB_STATIC int xdb_create_db (xdb_stmt_db_t *pStmt) { xdb_conn_t *pConn = pStmt->pConn; + xdb_dbm_t *pDbm = NULL; if (NULL != pStmt->pDbm) { if (NULL == pConn->pCurDbm) { - pConn->pCurDbm = pStmt->pDbm; + XDB_EXPECT (strlen(XDB_OBJ_NAME(pStmt->pDbm)) <= XDB_NAME_LEN, XDB_E_STMT, "DB name too long"); xdb_strcpy (pConn->cur_db, XDB_OBJ_NAME(pStmt->pDbm)); + pConn->pCurDbm = pStmt->pDbm; } return XDB_OK; } - xdb_dbm_t *pDbm = NULL; - char db_path[XDB_PATH_LEN + XDB_NAME_LEN + 1]; char *real_db_name = pStmt->db_name; char *db_name = strrchr (real_db_name, '/'); @@ -162,6 +164,8 @@ xdb_create_db (xdb_stmt_db_t *pStmt) pDbm->lock_mode = pStmt->lock_mode; pDbm->sync_mode = pStmt->sync_mode; + XDB_RWLOCK_INIT(pDbm->db_lock); + xdb_db_t *pDb = (xdb_db_t*)pDbm->stg_mgr.pStgHdr; XDB_EXPECT (strlen(real_db_name) < sizeof (pDbm->db_path), XDB_E_PARAM, "Too long path"); @@ -183,17 +187,20 @@ xdb_create_db (xdb_stmt_db_t *pStmt) if (!pDbm->bMemory) { xdb_sprintf (path, "%s/xdb.xql", real_db_name); if (xdb_fexist (path)) { + xdb_dbm_t* pCurDbm = pConn->pCurDbm; + pConn->pCurDbm = pDbm; xdb_source (pConn, path); + pConn->pCurDbm = pCurDbm; } } if ((pDb->flush_time < xdb_timestamp()-xdb_uptime()) && (pDb->lastchg_id != pDb->flush_id)) { - #ifdef XDB_CLI - xdb_errlog ("'%s' is broken, run 'REPAIR DATABASE to repaire DB'\n", XDB_OBJ_NAME(pDbm)); - #else - xdb_errlog ("Database '%s' was broken, repaire now\n", XDB_OBJ_NAME(pDbm)); - xdb_repair_db (pDbm, 0); - #endif + if (s_xdb_cli) { + xdb_errlog ("'%s' is broken, run 'REPAIR DATABASE to repaire DB'\n", XDB_OBJ_NAME(pDbm)); + } else { + xdb_errlog ("Database '%s' was broken, repaire now\n", XDB_OBJ_NAME(pDbm)); + xdb_repair_db (pDbm, 0); + } } if (strcmp (XDB_OBJ_NAME(pDbm), "system")) { @@ -343,10 +350,13 @@ xdb_flush_db (xdb_dbm_t *pDbm, uint32_t flags) if (NULL == pDbm || pDbm->bMemory) { return XDB_OK; } + + xdb_wrlock_db (pDbm); + xdb_db_t *pDb = XDB_DBPTR(pDbm); if (pDb->flush_id == pDb->lastchg_id) { - return XDB_OK; + goto exit; } xdb_dblog ("Flush DB '%s' %"PRIu64" -> %"PRIu64"\n", XDB_OBJ_NAME(pDbm), pDb->flush_id, pDb->lastchg_id); @@ -382,6 +392,8 @@ xdb_flush_db (xdb_dbm_t *pDbm, uint32_t flags) xdb_stg_sync (&pDbm->pWalmBak->stg_mgr, 0, 0, false); } +exit: + xdb_wrunlock_db (pDbm); return 0; } diff --git a/src/core/xdb_db.h b/src/core/xdb_db.h index a729501..49cad67 100644 --- a/src/core/xdb_db.h +++ b/src/core/xdb_db.h @@ -27,6 +27,7 @@ typedef struct xdb_dbm_t { xdb_walm_t *pWalmBak; xdb_rwlock_t wal_lock; + xdb_rwlock_t db_lock; } xdb_dbm_t; typedef struct xdb_dbobj_t { diff --git a/src/core/xdb_sql.c b/src/core/xdb_sql.c index fc86f49..185b273 100644 --- a/src/core/xdb_sql.c +++ b/src/core/xdb_sql.c @@ -123,7 +123,25 @@ xdb_stmt_exec (xdb_stmt_t *pStmt) case XDB_STMT_CREATE_SVR: rc = xdb_create_server ((xdb_stmt_svr_t*)pStmt); break; + case XDB_STMT_DROP_SVR: + rc = xdb_drop_server ((xdb_stmt_svr_t*)pStmt); + break; + #endif + + #if (XDB_ENABLE_PUBSUB == 1) + case XDB_STMT_CREATE_PUB: + rc = xdb_create_pub ((xdb_stmt_pub_t*)pStmt); + break; + + case XDB_STMT_CREATE_SUB: + rc = xdb_create_sub ((xdb_stmt_sub_t*)pStmt); + break; + + case XDB_STMT_SUBSCRIBE: + rc = xdb_subscribe ((xdb_stmt_subscribe_t*)pStmt); + break; #endif + case XDB_STMT_USE_DB: rc = xdb_use_db ((xdb_stmt_db_t*)pStmt); if (XDB_OK == rc) { @@ -196,6 +214,9 @@ xdb_stmt_exec (xdb_stmt_t *pStmt) case XDB_STMT_SHOW_IDX: pRes = xdb_pexec (pStmt->pConn, "SELECT table,idx_key,type,col_list FROM system.indexes WHERE database='%s'", XDB_OBJ_NAME(pConn->pCurDbm)); break; + case XDB_STMT_SHOW_SVR: + pRes = xdb_pexec (pStmt->pConn, "SELECT * FROM system.servers"); + break; case XDB_STMT_DESC: { xdb_stmt_tbl_t *pStmtTbl = (xdb_stmt_tbl_t*)pStmt; @@ -207,7 +228,7 @@ xdb_stmt_exec (xdb_stmt_t *pStmt) { char cur_db[XDB_NAME_LEN+2]; xdb_strcpy (cur_db, xdb_curdb(pConn)); - rc = xdb_shell_loop (pStmt->pConn, NULL, xdb_curdb(pConn)); + rc = xdb_shell_loop (pStmt->pConn, NULL, xdb_curdb(pConn), true); // reover current db if (*cur_db != '\0') { pRes = xdb_pexec (pStmt->pConn, "USE %s", cur_db); @@ -244,9 +265,12 @@ xdb_stmt_exec (xdb_stmt_t *pStmt) // extra msg set if (xdb_unlikely (pConn->conn_msg.len > 0)) { - pRes->data_len = sizeof (xdb_msg_t) + pConn->conn_msg.len + 1; + pRes->data_len = sizeof (xdb_msg_t) - sizeof(pConn->conn_msg.msg) + pConn->conn_msg.len + 1; pRes->row_data = (uintptr_t)pConn->conn_msg.msg; } + if (rc != XDB_OK) { + XDB_SETERR (rc, "Failed"); + } } pRes->stmt_type = pStmt->stmt_type; @@ -453,11 +477,39 @@ xdb_vbexec (xdb_conn_t *pConn, const char *sql, va_list ap) xdb_res_t* xdb_bexec (xdb_conn_t *pConn, const char *sql, ...) { + if (xdb_unlikely (pConn->conn_client)) { + va_list vargs; + + XDB_BUF_DEF(pSql,4096); + + va_start(vargs, sql); + int len = vsnprintf ((char*)pSql, pSql_size-1, sql, vargs); + va_end(vargs); + + if (len >= pSql_size) { + XDB_BUF_ALLOC(pSql,len+1); + XDB_EXPECT (NULL != pSql, XDB_E_MEMORY, "Can't alloc memory"); + va_start(vargs, sql); + len = vsnprintf (pSql, len, sql, vargs); + va_end(vargs); + } + pSql[pSql_size-1] = '\0'; + + xdb_res_t *pRes = xdb_exec (pConn, pSql); + + XDB_BUF_FREE(pSql); + + return pRes; + } + va_list ap; va_start (ap, sql); xdb_res_t* pRes = xdb_vbexec (pConn, sql, ap); va_end (ap); return pRes; + +error: + return &pConn->conn_res; } XDB_STATIC int @@ -519,25 +571,25 @@ xdb_exec_out (xdb_conn_t *pConn, const char *sql, int len) default: { if (pRes->errcode > 0) { - fprintf (pConn->conn_stdout, "ERROR %d: %s\n\n", pRes->errcode, xdb_errmsg(pRes)); + xdb_fprintf (pConn->conn_stdout, "ERROR %d: %s\n\n", pRes->errcode, xdb_errmsg(pRes)); } else { if (pRes->row_count) { xdb_output_table (pConn, pRes); } if (pRes->meta_len > 0) { - fprintf (pConn->conn_stdout, "%"PRId64" row%s in set (%d.%03d ms)\n\n", pRes->row_count, pRes->row_count>1?"s":"", (int)(ts/1000), (int)(ts%1000)); + xdb_fprintf (pConn->conn_stdout, "%"PRId64" row%s in set (%d.%03d ms)\n\n", pRes->row_count, pRes->row_count>1?"s":"", (int)(ts/1000), (int)(ts%1000)); } else if (1 == pRes->stmt_type) { - fprintf (pConn->conn_stdout, "Database changed\n\n"); + xdb_fprintf (pConn->conn_stdout, "Database changed\n\n"); } else if (pRes->stmt_type < 200) { if (isatty(STDIN_FILENO)) { if (pRes->data_len) { - fprintf (pConn->conn_stdout, "%s\n", xdb_errmsg(pRes)); + xdb_fprintf (pConn->conn_stdout, "%s\n", xdb_errmsg(pRes)); } - fprintf (pConn->conn_stdout, "Query OK, %" PRId64 " row%s affected (%d.%03d ms)\n\n", pRes->affected_rows, pRes->affected_rows>1?"s":"", (int)(ts/1000), (int)(ts%1000)); + xdb_fprintf (pConn->conn_stdout, "Query OK, %" PRId64 " row%s affected (%d.%03d ms)\n\n", pRes->affected_rows, pRes->affected_rows>1?"s":"", (int)(ts/1000), (int)(ts%1000)); } } } - fflush (pConn->conn_stdout); + xdb_fflush (pConn->conn_stdout); } break; } diff --git a/src/core/xdb_sysdb.c b/src/core/xdb_sysdb.c index 00054a8..2294293 100644 --- a/src/core/xdb_sysdb.c +++ b/src/core/xdb_sysdb.c @@ -174,6 +174,31 @@ xdb_sysdb_del_tbl (xdb_tblm_t *pTblm) XDB_RESCHK(pRes); } +XDB_STATIC void +xdb_sysdb_add_svr (xdb_server_t *pSvr) +{ + if (!s_xdb_bInit) { + return; + } + xdb_res_t *pRes = xdb_pexec (s_xdb_sysdb_pConn, "INSERT INTO servers (server,port) VALUES('%s',%u)", + XDB_OBJ_NAME(pSvr), pSvr->svr_port); + if (pRes->errcode != XDB_E_NOTFOUND) { + XDB_RESCHK(pRes, xdb_errlog("Can't add to system table servers %s\n", XDB_OBJ_NAME(pSvr))); + } +} + +XDB_STATIC void +xdb_sysdb_del_svr (xdb_server_t *pSvr) +{ + if (!s_xdb_bInit) { + return; + } + xdb_res_t *pRes = xdb_pexec (s_xdb_sysdb_pConn, "DELETE FROM servers WHERE server='%s'", XDB_OBJ_NAME(pSvr)); + if (pRes->errcode != XDB_E_NOTFOUND) { + XDB_RESCHK(pRes, xdb_errlog("Can't del from system table servers %s\n", XDB_OBJ_NAME(pSvr))); + } +} + XDB_STATIC int xdb_sysdb_init () { @@ -217,6 +242,9 @@ xdb_sysdb_init () pRes = xdb_exec (pConn, "CREATE TABLE IF NOT EXISTS databases (database CHAR(64), engine CHAR(8), data_path VARCHAR(1024), PRIMARY KEY (database))"); XDB_RESCHK(pRes, xdb_errlog ("Can't create system table databases\n")); + pRes = xdb_exec (pConn, "CREATE TABLE IF NOT EXISTS servers (server CHAR(64), port INT)"); + XDB_RESCHK(pRes, xdb_errlog ("Can't create system table databases\n")); + xdb_sysdb_add_db (pConn->pCurDbm); //xdb_exec (pConn, "CREATE DATABASE IF NOT EXISTS information_schema ENGINE=MEMORY"); diff --git a/src/core/xdb_table.c b/src/core/xdb_table.c index 6f68c88..f2b21e6 100644 --- a/src/core/xdb_table.c +++ b/src/core/xdb_table.c @@ -73,6 +73,8 @@ xdb_create_table (xdb_stmt_tbl_t *pStmt) return XDB_OK; } + xdb_wrlock_db (pDbm); + XDB_EXPECT (XDB_OBJM_COUNT (pDbm->db_objm) < XDB_MAX_TBL, XDB_E_FULL, "Can create at most %d tables", XDB_MAX_TBL); pTblm = xdb_calloc (sizeof(xdb_tblm_t)); @@ -219,9 +221,12 @@ xdb_create_table (xdb_stmt_tbl_t *pStmt) xdb_gen_db_schema (pTblm->pDbm); } + xdb_wrunlock_db (pDbm); + return XDB_OK; error: + xdb_wrunlock_db (pDbm); xdb_free (pTblm); return -pConn->conn_res.errcode; } @@ -271,10 +276,12 @@ xdb_close_table (xdb_tblm_t *pTblm) XDB_STATIC int xdb_drop_table (xdb_tblm_t *pTblm) { - xdb_tbllog ("Drop Table '%s'\n", XDB_OBJ_NAME(pTblm)); - xdb_dbm_t* pDbm = pTblm->pDbm; + xdb_wrlock_db (pDbm); + + xdb_tbllog ("Drop Table '%s'\n", XDB_OBJ_NAME(pTblm)); + xdb_sysdb_del_tbl (pTblm); int count = XDB_OBJM_MAX(pTblm->idx_objm); @@ -302,6 +309,8 @@ xdb_drop_table (xdb_tblm_t *pTblm) xdb_gen_db_schema (pDbm); + xdb_wrunlock_db (pDbm); + return XDB_OK; } diff --git a/src/core/xdb_trans.c b/src/core/xdb_trans.c index eb93e5a..065d256 100644 --- a/src/core/xdb_trans.c +++ b/src/core/xdb_trans.c @@ -374,8 +374,7 @@ int xdb_rollback (xdb_conn_t *pConn) { if (xdb_unlikely (pConn->conn_client)) { - xdb_res_t *pRes = xdb_exec (pConn, "ROLLBACK"); - return -pRes->errcode; + return XDB_OK; } if (xdb_unlikely (!pConn->bInTrans)) { @@ -447,7 +446,13 @@ void* xdb_bg_task (void *data) sleep (10); while (s_xdb_bInit) { sleep (s_xdb_flush_period); + if (!s_xdb_bInit) { + break; + } s_xdb_bg_run++; + if (!s_xdb_bInit) { + break; + } for (int i = 0; i < XDB_OBJM_MAX(s_xdb_db_list); ++i) { xdb_dbm_t *pDbm = XDB_OBJM_GET(s_xdb_db_list, i); xdb_db_t *pDb = XDB_DBPTR(pDbm); diff --git a/src/core/xdb_trans.h b/src/core/xdb_trans.h index abedec3..09e7f3c 100644 --- a/src/core/xdb_trans.h +++ b/src/core/xdb_trans.h @@ -93,6 +93,50 @@ xdb_wrunlock_tblstg (xdb_tblm_t *pTblm) return XDB_OK; } +static inline int +xdb_rdlock_db (xdb_dbm_t *pDbm) +{ + if (XDB_LOCK_PROCESS == pDbm->lock_mode) { + return xdb_file_rdlock (pDbm->stg_mgr.stg_fd, 0, 1); + } else { + xdb_rwlock_rdlock (&pDbm->db_lock); + } + return XDB_OK; +} + +static inline int +xdb_rdunlock_db (xdb_dbm_t *pDbm) +{ + if (XDB_LOCK_PROCESS == pDbm->lock_mode) { + return xdb_file_unlock (pDbm->stg_mgr.stg_fd, 0, 1); + } else { + xdb_rwlock_rdunlock (&pDbm->db_lock); + } + return XDB_OK; +} + +static inline int +xdb_wrlock_db (xdb_dbm_t *pDbm) +{ + if (XDB_LOCK_PROCESS == pDbm->lock_mode) { + return xdb_file_wrlock (pDbm->stg_mgr.stg_fd, 0, 1); + } else { + xdb_rwlock_wrlock (&pDbm->db_lock); + } + return XDB_OK; +} + +static inline int +xdb_wrunlock_db (xdb_dbm_t *pDbm) +{ + if (XDB_LOCK_PROCESS == pDbm->lock_mode) { + return xdb_file_unlock (pDbm->stg_mgr.stg_fd, 0, 1); + } else { + xdb_rwlock_wrunlock (&pDbm->db_lock); + } + return XDB_OK; +} + XDB_STATIC void xdb_tbltrans_init (xdb_tblTrans_t *pTblRows); diff --git a/src/core/xdb_wal.c b/src/core/xdb_wal.c index b9832ff..0ce659b 100644 --- a/src/core/xdb_wal.c +++ b/src/core/xdb_wal.c @@ -259,12 +259,12 @@ xdb_trans_db_wal (uint32_t did, void *pArg) if (pDbm->sync_mode > 0) { // include endof commit_len=0 if (pWal->commit_id - pWal->sync_cid >= pDbm->sync_mode) { - xdb_stg_sync (&pDbm->stg_mgr, pWal->commit_size - sizeof(xdb_commit_t), flush_size, false); + xdb_stg_sync (&pDbm->pWalm->stg_mgr, pWal->sync_size - sizeof(xdb_commit_t), flush_size, false); pWal->commit_size += pCommit->commit_len; pWal->sync_size = pWal->commit_size; pWal->sync_cid = pCommit->commit_id; } else { - xdb_stg_sync (&pDbm->stg_mgr, pWal->commit_size - sizeof(xdb_commit_t), flush_size, true); + xdb_stg_sync (&pDbm->pWalm->stg_mgr, pWal->sync_size - sizeof(xdb_commit_t), flush_size, true); pWal->commit_size += pCommit->commit_len; } } else { @@ -313,7 +313,7 @@ xdb_wal_flush (xdb_walm_t *pWalm) return XDB_OK; } -XDB_STATIC xdb_commit_id +XDB_STATIC uint64_t xdb_wal_iterate (xdb_walm_t *pWalm, xdb_wal_callbck cb_func, void *pArg) { xdb_wal_t *pWal = XDB_WAL_PTR(pWalm); @@ -322,7 +322,7 @@ xdb_wal_iterate (xdb_walm_t *pWalm, xdb_wal_callbck cb_func, void *pArg) xdb_tblm_t *pTblm; xdb_dbm_t *pDbm = pWalm->pDbm; uint64_t commit_size = sizeof (xdb_wal_t); - xdb_commit_id last_commit_id = pWal->commit_id; + uint64_t last_commit_id = pWal->commit_id; void *pWalEnd = (void*)pWal + pWal->commit_size; // Go over until end or checksum error @@ -394,9 +394,11 @@ xdb_wal_redo_row (xdb_tblm_t *pTblm, xdb_walrow_t *pWalRow, void *pArg) int vlen = pWalRow->row_len - sizeof(xdb_walrow_t) - pTblm->row_size - 4; if (vlen > 0) { uint32_t *pVdat = xdb_row_vdata_get (pTblm, pDbRow); - memcpy ((void*)pVdat + 4, pRow + pTblm->row_size, vlen); - // b1000 - *pVdat = (8<row_size, vlen); + // b1000 + *pVdat = (8<row_size); diff --git a/src/core/xdb_wal.h b/src/core/xdb_wal.h index d2fe365..b0aefed 100644 --- a/src/core/xdb_wal.h +++ b/src/core/xdb_wal.h @@ -12,19 +12,30 @@ #ifndef __XDB_WAL_H__ #define __XDB_WAL_H__ -typedef uint32_t xdb_commit_id; +#if 0 +#define XDB_WAL_LENBITS 28 +#define XDB_WAL_LENMASK ((1< 2048) { XDB_BUF_ALLOC (str, len); @@ -104,13 +108,38 @@ xdb_strcasehash(const char *key, int len) return -1; } } - for (int i = 0; i< len; ++i) { + for (int i = 0; i < len; ++i) { str[i] = tolower(key[i]); } - uint64_t hash = xdb_wyhash (str, len); XDB_BUF_FREE(str); return hash; } + +static int xdb_fprintf (FILE *pFile, const char *format, ...) +{ + int len; + va_list ap; + va_start(ap, format); + + if ((uintptr_t)pFile > 0xffff) { + len = vfprintf (pFile, format, ap); + } else { + len = xdb_sock_vprintf ((uintptr_t)pFile, format, ap); + } + + va_end(ap); + + return len; +} + +static int xdb_fflush (FILE *pFile) +{ + if ((uintptr_t)pFile > 0xffff) { + return fflush (pFile); + } + return 0; +} + diff --git a/src/lib/xdb_lib.h b/src/lib/xdb_lib.h index 18b4ab3..4ace96d 100644 --- a/src/lib/xdb_lib.h +++ b/src/lib/xdb_lib.h @@ -15,9 +15,9 @@ #include #include #include -#ifndef _WIN32 +//#ifndef _WIN32 #include -#endif +//#endif #include #include #include @@ -25,36 +25,40 @@ #include #include #include -#ifndef _WIN32 +//#ifndef _WIN32 #include -#endif +//#endif #include #include #include -#ifndef _WIN32 +//#ifndef _WIN32 #include -#endif +//#endif #include -#ifndef _WIN32 +//#ifndef _WIN32 #include -#endif +//#endif #include #include #include #ifdef _WIN32 - #include +#include +#include +#define localtime_r(timep, result) localtime_s(result, timep) +typedef unsigned int in_addr_t; #endif #if defined(__BIG_ENDIAN__) || (defined(__BYTE_ORDER__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__) - #define XDB_BIG_ENDIAN 1 + #define XDB_BIG_ENDIAN 1 #endif #define XDB_ARY_LEN(a) (sizeof(a)/sizeof(a[0])) #define XDB_OFFSET(st,fld) offsetof(st,fld) -#define XDB_ALIGN4(len) ((len + 3) & ~(3)) -#define XDB_ALIGN8(len) ((len + 7) & ~(7)) +#define XDB_ALIGN4(len) ((len + 3) & ~3) +#define XDB_ALIGN8(len) ((len + 7) & ~7) +#define XDB_ALIGN4K(len) ((len + 4093) & ~4093) #ifdef XDB_DEBUG #define xdb_assert(exp) assert(exp) @@ -126,12 +130,18 @@ xdb_wyhash(const void *key, size_t len) XDB_STATIC uint64_t xdb_strcasehash(const char *key, int len); -//XDB_STATIC void xdb_hexdump (const void *addr, int len); +void xdb_hexdump (const void *addr, int len); +#ifndef _WIN32 #if (XDB_ENABLE_SERVER == 1) XDB_STATIC int xdb_signal_block (int signum); #endif +#endif + +static int xdb_fprintf (FILE *pFile, const char *format, ...); +static int xdb_fflush (FILE *pFile); +#define xdb_fputc(c, pFile) xdb_fprintf(pFile, "%c", c) #include "xdb_objm.h" #include "xdb_bmp.h" diff --git a/src/lib/xdb_sock.c b/src/lib/xdb_sock.c index 34abd67..f411d0f 100644 --- a/src/lib/xdb_sock.c +++ b/src/lib/xdb_sock.c @@ -9,14 +9,14 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. ******************************************************************************/ +#if !defined(_WIN32) + #include #include #include #include #include -#if !defined(_WIN32) - #define xdb_sock_open(domain,type,protocol) socket(domain, type, protocol) #define xdb_sock_read(sockfd, buf, len) read(sockfd, buf, len) #define xdb_sock_write(sockfd, buf, len) write(sockfd, buf, len) @@ -125,3 +125,74 @@ #define socket_exit() WSACleanup(); #endif +static inline int xdb_sock_SetTcpNoDelay (int fd, int val) +{ +#ifndef _WIN32 + if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) < 0) { + return -1; + } +#endif + return 0; +} + +XDB_STATIC void xdb_sockaddr_init (struct sockaddr_in *addr, int port, const char *host) +{ + memset (addr, 0, sizeof(*addr)); + addr->sin_family = AF_INET; + addr->sin_port = htons(port); + in_addr_t hadd = inet_addr (host); + memcpy (&addr->sin_addr, &hadd, sizeof (addr->sin_addr)); +} + +static int xdb_sock_vprintf (int sockfd, const char *format, va_list ap) +{ + char buf[32*1024], *pBuf = buf; + va_list dupArgs; + + va_copy(dupArgs, ap); + + int len = vsnprintf(pBuf, sizeof(buf), format, ap); + if (len >= sizeof(buf)) { + pBuf = (char*) xdb_malloc(len + 1 ); + if (NULL != pBuf) { + vsnprintf (pBuf, len+1, format, dupArgs); + pBuf[len] = '\0'; + } + } + + va_end(dupArgs); + + if (NULL != pBuf) { + len = xdb_sock_write (sockfd, pBuf, len); + } + if (pBuf != buf) { + xdb_free (pBuf); + } + + return len; +} + +#if 0 +static int xdb_sock_printf (int sockfd, const char *format, ...) +{ + va_list ap; + + va_start(ap, format); + int len = xdb_sock_vprintf (sockfd, format, ap); + va_end(ap); + + return len; +} + +static int cdb_sock_puts (int sockfd, const char *str) +{ + int len = strlen(str); + len = xdb_sock_write (sockfd, str, strlen(str)); + return len; +} + +static int cdb_sock_putc (int sockfd, const char ch) +{ + return xdb_sock_write (sockfd, &ch, 1); +} +#endif diff --git a/src/lib/xdb_thread.c b/src/lib/xdb_thread.c index 3a79c8d..868a170 100644 --- a/src/lib/xdb_thread.c +++ b/src/lib/xdb_thread.c @@ -71,27 +71,23 @@ xdb_rwlock_wrunlock(xdb_rwlock_t *rwl) __atomic_store_n(&rwl->count, 0, __ATOMIC_RELEASE); } -#ifndef _WIN32 - -#include -#include - typedef pthread_t xdb_thread_t; -#else -#include -typedef DWORD pthread_t; static inline int -pthread_create(pthread_t *thread, const void *attr, void *(*start_routine) (void *), void *arg) +xdb_create_thread (xdb_thread_t *pThread, const void *pAttr, void *(*start_routine) (void *), void *pArg) { - (void) attr; - CreateThread (0, 0, (LPTHREAD_START_ROUTINE)start_routine, arg, 0, thread); - return 0; + return pthread_create (pThread, pAttr, start_routine, pArg); } -#endif +#if 0 +#ifdef _WIN32 +typedef DWORD xdb_thread_t; static inline int -xdb_create_thread (xdb_thread_t *pThread, const void *pAttr, void *(*start_routine) (void *), void *pArg) +xdb_create_thread (xdb_thread_t *thread, const void *attr, void *(*start_routine) (void *), void *arg) { - return pthread_create (pThread, pAttr, start_routine, pArg); + (void) attr; + CreateThread (0, 0, (LPTHREAD_START_ROUTINE)start_routine, arg, 0, thread); + return 0; } +#endif +#endif diff --git a/src/parser/xdb_parser.c b/src/parser/xdb_parser.c index bb278ff..54d4163 100644 --- a/src/parser/xdb_parser.c +++ b/src/parser/xdb_parser.c @@ -50,6 +50,9 @@ xdb_parse_dbtblname (xdb_conn_t *pConn, xdb_token_t *pTkn) #if (XDB_ENABLE_SERVER == 1) #include "xdb_parser_svr.c" #endif +#if (XDB_ENABLE_PUBSUB == 1) +#include "xdb_parser_pubsub.c" +#endif XDB_STATIC xdb_stmt_t* xdb_parse_use (xdb_conn_t* pConn, xdb_token_t *pTkn) @@ -103,6 +106,10 @@ xdb_parse_create (xdb_conn_t* pConn, xdb_token_t *pTkn) #if (XDB_ENABLE_SERVER == 1) return xdb_parse_create_server (pConn, pTkn); #endif + #if (XDB_ENABLE_PUBSUB == 1) + } else if (!strcasecmp (pTkn->token, "SUBSCRIPTION")) { + return xdb_parse_create_sub (pConn, pTkn); + #endif } break; case 'U': @@ -113,6 +120,14 @@ xdb_parse_create (xdb_conn_t* pConn, xdb_token_t *pTkn) return xdb_parse_create_index (pConn, pTkn, true); } break; + case 'P': + case 'p': + #if (XDB_ENABLE_PUBSUB == 1) + if (!strcasecmp (pTkn->token, "PUBLICATION")) { + return xdb_parse_create_pub (pConn, pTkn); + } + #endif + break; } } @@ -253,6 +268,8 @@ xdb_parse_show (xdb_conn_t* pConn, xdb_token_t *pTkn) type = xdb_next_token (pTkn); XDB_PARSE_DBTBLNAME (); } + } else if (!strcasecmp (pTkn->token, "SERVERS")) { + pStmt->stmt_type = XDB_STMT_SHOW_SVR; } else { XDB_SETERR (XDB_E_STMT, "Unknown show object '%s'", pTkn->token); return NULL; @@ -439,6 +456,10 @@ xdb_sql_parse (xdb_conn_t* pConn, char **ppSql, bool bPStmt) pStmt = xdb_parse_source (pConn, &token); } else if (! strcasecmp (token.token, "SHELL")) { pStmt = xdb_parse_shell (pConn, &token); + #if (XDB_ENABLE_PUBSUB == 1) + } else if (! strcasecmp (token.token, "SUBSCRIBE")) { + pStmt = xdb_parse_subscribe (pConn, &token); + #endif } else { goto error; } @@ -571,7 +592,7 @@ xdb_sql_parse (xdb_conn_t* pConn, char **ppSql, bool bPStmt) } else { xdb_res_t* pRes = &pConn->conn_res; pRes->row_data = (uintptr_t)pConn->conn_msg.msg; - pRes->data_len = sizeof (xdb_msg_t) + pConn->conn_msg.len + 1; + pRes->data_len = sizeof (xdb_msg_t) - sizeof(pConn->conn_msg.msg) + pConn->conn_msg.len + 1; } // move to end of token for next stmt diff --git a/src/parser/xdb_parser_db.c b/src/parser/xdb_parser_db.c index b483034..3049e48 100644 --- a/src/parser/xdb_parser_db.c +++ b/src/parser/xdb_parser_db.c @@ -102,9 +102,9 @@ XDB_STATIC xdb_stmt_t* xdb_parse_drop_db (xdb_conn_t* pConn, xdb_token_t *pTkn) { xdb_stmt_db_t *pStmt = &pConn->stmt_union.db_stmt; + memset (pStmt, 0, sizeof (*pStmt)); xdb_token_type type = xdb_next_token (pTkn); pStmt->stmt_type = XDB_STMT_DROP_DB; - pStmt->pSql = NULL; XDB_EXPECT (type<=XDB_TOK_STR, XDB_E_STMT, "Miss database name"); @@ -140,9 +140,9 @@ XDB_STATIC xdb_stmt_t* xdb_parse_open_datadir (xdb_conn_t* pConn, xdb_token_t *pTkn) { xdb_stmt_db_t *pStmt = &pConn->stmt_union.db_stmt; + memset (pStmt, 0, sizeof(*pStmt)); xdb_token_type type = xdb_next_token (pTkn); pStmt->stmt_type = XDB_STMT_OPEN_DATADIR; - pStmt->pSql = NULL; XDB_EXPECT (XDB_TOK_STR==type, XDB_E_STMT, "Miss datadir"); @@ -158,14 +158,13 @@ XDB_STATIC xdb_stmt_t* xdb_parse_open_db (xdb_conn_t* pConn, xdb_token_t *pTkn) { xdb_stmt_db_t *pStmt = &pConn->stmt_union.db_stmt; + memset (pStmt, 0, sizeof(*pStmt)); xdb_token_type type = xdb_next_token (pTkn); pStmt->stmt_type = XDB_STMT_OPEN_DB; - pStmt->pSql = NULL; XDB_EXPECT (XDB_TOK_STR>=type, XDB_E_STMT, "Open DB miss database name"); pStmt->db_name = pTkn->token; - pStmt->lock_mode = 0; return (xdb_stmt_t*)pStmt; error: @@ -177,14 +176,12 @@ XDB_STATIC xdb_stmt_t* xdb_parse_close_db (xdb_conn_t* pConn, xdb_token_t *pTkn) { xdb_stmt_db_t *pStmt = &pConn->stmt_union.db_stmt; + memset (pStmt, 0, sizeof (*pStmt)); xdb_token_type type = xdb_next_token (pTkn); pStmt->stmt_type = XDB_STMT_CLOSE_DB; - pStmt->pSql = NULL; XDB_EXPECT (type<=XDB_TOK_STR, XDB_E_STMT, "Miss database name"); - pStmt->bIfExistOrNot = false; - pStmt->db_name = pTkn->token; pStmt->pDbm = xdb_find_db (pStmt->db_name); @@ -207,13 +204,10 @@ XDB_STATIC xdb_stmt_t* xdb_parse_dump_db (xdb_conn_t* pConn, xdb_token_t *pTkn) { xdb_stmt_backup_t *pStmt = &pConn->stmt_union.backup_stmt; + memset (pStmt, 0, sizeof (*pStmt)); xdb_token_type type = xdb_next_token (pTkn); pStmt->stmt_type = XDB_STMT_DUMP_DB; - pStmt->file = NULL; - pStmt->bNoDrop = false; - pStmt->bNoCreate = false; - pStmt->bNoData = false; if (type >= XDB_TOK_END) { XDB_EXPECT (pConn->pCurDbm != NULL, XDB_E_NODB, XDB_SQL_NO_DB_ERR); diff --git a/src/parser/xdb_parser_svr.c b/src/parser/xdb_parser_svr.c index ec19052..032c4d2 100644 --- a/src/parser/xdb_parser_svr.c +++ b/src/parser/xdb_parser_svr.c @@ -13,8 +13,8 @@ XDB_STATIC xdb_stmt_t* xdb_parse_create_server (xdb_conn_t* pConn, xdb_token_t *pTkn) { xdb_stmt_svr_t *pStmt = &pConn->stmt_union.svr_stmt; + memset (pStmt, 0, sizeof (*pStmt)); pStmt->stmt_type = XDB_STMT_CREATE_SVR; - pStmt->pSql = NULL; xdb_token_type type = xdb_next_token (pTkn); XDB_EXPECT (XDB_TOK_STR>=type, XDB_E_STMT, "Miss server name"); @@ -53,11 +53,12 @@ XDB_STATIC xdb_stmt_t* xdb_parse_drop_server (xdb_conn_t* pConn, xdb_token_t *pTkn) { xdb_stmt_svr_t *pStmt = &pConn->stmt_union.svr_stmt; + memset (pStmt, 0, sizeof (*pStmt)); pStmt->stmt_type = XDB_STMT_DROP_SVR; - pStmt->pSql = NULL; xdb_token_type type = xdb_next_token (pTkn); XDB_EXPECT (XDB_TOK_STR>=type, XDB_E_STMT, "Miss server name"); + pStmt->svr_name = pTkn->token; return (xdb_stmt_t*)pStmt; diff --git a/src/parser/xdb_stmt.h b/src/parser/xdb_stmt.h index 5777347..780c7f7 100644 --- a/src/parser/xdb_stmt.h +++ b/src/parser/xdb_stmt.h @@ -50,8 +50,17 @@ typedef enum { // Prepared STMT // Relication - + XDB_STMT_CREATE_PUB = 20, + XDB_STMT_ALTER_PUB, + XDB_STMT_DROP_PUB, + XDB_STMT_SHOW_PUB, + // Subscription + XDB_STMT_CREATE_SUB = 25, + XDB_STMT_ALTER_SUB, + XDB_STMT_DROP_SUB, + XDB_STMT_SHOW_SUB, + XDB_STMT_SUBSCRIBE, /////////////////////////// @@ -195,6 +204,28 @@ typedef struct { int svr_port; } xdb_stmt_svr_t; +typedef struct { + XDB_STMT_COMMON; + bool bIfExistOrNot; + char *pub_name; +} xdb_stmt_pub_t; + +typedef struct { + XDB_STMT_COMMON; + bool bIfExistOrNot; + char *pub_name; + char *sub_name; + char *pub_host; + int pub_port; +} xdb_stmt_sub_t; + +typedef struct { + XDB_STMT_COMMON; + bool bIfExistOrNot; + char *pub_name; + char *sub_name; +} xdb_stmt_subscribe_t; + typedef enum { XDB_IDX_HASH = 0, XDB_IDX_RBTREE = 1, @@ -245,7 +276,7 @@ typedef struct { xdb_value_t op_val[2]; } xdb_exp_t; -typedef struct { +typedef struct xdb_setfld_t { xdb_field_t *pField; xdb_exp_t exp; } xdb_setfld_t; @@ -411,6 +442,9 @@ typedef union { xdb_stmt_select_t select_stmt; xdb_stmt_set_t set_stmt; xdb_stmt_svr_t svr_stmt; + xdb_stmt_pub_t pub_stmt; + xdb_stmt_subscribe_t subscribe_stmt; + xdb_stmt_sub_t sub_stmt; xdb_stmt_lock_t lock_stmt; xdb_stmt_backup_t backup_stmt; } xdb_stmt_union_t; diff --git a/src/server/xdb_client.c b/src/server/xdb_client.c new file mode 100644 index 0000000..874c840 --- /dev/null +++ b/src/server/xdb_client.c @@ -0,0 +1,223 @@ +/****************************************************************************** +* Copyright (c) 2024-present JC Wang. All rights reserved +* +* https://crossdb.org +* https://github.com/crossdb-org/crossdb +* +* This Source Code Form is subject to the terms of the Mozilla Public +* License, v. 2.0. If a copy of the MPL was not distributed with this +* file, You can obtain one at https://mozilla.org/MPL/2.0/. +******************************************************************************/ + +#ifndef xdb_svrlog +#if XDB_LOG_FLAGS & XDB_LOG_SVR +#define xdb_svrlog(...) xdb_print(__VA_ARGS__) +#else +#define xdb_svrlog(...) +#endif +#endif + +XDB_STATIC int +xdb_reconnect (xdb_conn_t *pConn) +{ + int ret = XDB_E_SOCK; + + pConn->sockfd = xdb_sock_open (AF_INET, SOCK_STREAM, 0); + if (pConn->sockfd < 0) { + xdb_errlog ("Can't create socket\n"); + goto error; + } + struct sockaddr_in serverin; + xdb_sockaddr_init (&serverin, pConn->port, pConn->host); + + ret = xdb_sock_connect (pConn->sockfd, (struct sockaddr*)&serverin, sizeof(serverin)); + if (0 != ret) { + xdb_errlog ("Can't connect %s:%d, %s\n", pConn->host, pConn->port, strerror(errno)); + goto error; + } + + xdb_sock_SetTcpNoDelay (pConn->sockfd, 1); + + xdb_res_t *pRes = xdb_exec (pConn, "SET FORMAT=NATIVELE"); + XDB_RESCHK(pRes, goto error); + if (*pConn->cur_db != '\0') { + pRes = xdb_pexec (pConn, "USE %s", pConn->cur_db); + XDB_RESCHK(pRes, goto error); + } + return XDB_OK; + +error: + if (pConn->sockfd >= 0) { + xdb_sock_close (pConn->sockfd); + pConn->sockfd = -1; + } + return ret; +} + +xdb_conn_t* +xdb_connect (const char *host, const char *user, const char *pass, const char *db, uint16_t port) +{ + xdb_init (); + + if ((NULL == host) || ('\0' == *host)) { + host = "127.0.0.1"; + } else if (0 == port) { + port = XDB_SVR_PORT; + } + + if (0 == port) { + return xdb_open (db); + } + + xdb_conn_t* pConn = xdb_calloc (sizeof (xdb_conn_t)); + if (NULL == pConn) { + return NULL; + } + + xdb_conn_init (pConn); + + if (NULL != db) { + xdb_strcpy (pConn->cur_db, db); + } + xdb_strcpy (pConn->host, host); + pConn->port = port; + pConn->conn_client = true; + + int ret = xdb_reconnect (pConn); + if (XDB_OK == ret) { + xdb_atomic_inc (&s_xdb_conn_count); + return pConn; + } + + xdb_free (pConn); + return NULL; +} + +XDB_STATIC xdb_res_t* +xdb_fetch_res_sock (xdb_conn_t *pConn) +{ + xdb_res_t *pRes = &pConn->conn_res; + + // read result + uint32_t len = xdb_sock_read (pConn->sockfd, pRes, sizeof(*pRes)); + if (len < sizeof(*pRes)) { + return NULL; + } + + // read + int reslen = sizeof (xdb_queryRes_t) + pRes->data_len; + if (0 == pRes->meta_len) { + if (pRes->data_len > 0) { + len = xdb_sock_read (pConn->sockfd, &pConn->conn_msg, pRes->data_len); + if (len < pRes->data_len) { + return NULL; + } + pRes->row_data = (uintptr_t)pConn->conn_msg.msg; + } + } else { + + pRes = xdb_queryres_alloc (pConn, reslen + pRes->meta_len + pRes->col_count * 8 + 7); + if (NULL != pRes) { + return pRes; + } + xdb_queryRes_t *pQueryRes = pConn->pQueryRes; + + pRes = &pQueryRes->res; + pQueryRes->res = pConn->conn_res; + pQueryRes->pConn = pConn; + pQueryRes->pStmt = NULL; + + int64_t rdlen = sizeof (xdb_queryRes_t); + while (rdlen < reslen) { + len = xdb_sock_read (pConn->sockfd, (void*)pQueryRes+rdlen, reslen-rdlen); + if (len > 0) { + rdlen += len; + } else { + return NULL; + } + } + xdb_svrlog ("get response %d from server\n", sizeof(*pRes) + pRes->data_len); +#if XDB_LOG_FLAGS & XDB_LOG_SVR + //xdb_hexdump (pRes, sizeof(*pRes) + pRes->data_len); +#endif + pRes->row_data = (uintptr_t)&pQueryRes->rowlist; + xdb_meta_t *pMeta = (xdb_meta_t*)(pRes + 1); + pRes->col_meta = (uintptr_t)pMeta; + pMeta->col_list = ((uintptr_t)pQueryRes + sizeof (xdb_queryRes_t) + pRes->data_len + 7) & (~7LL); + uint64_t *pColList = (uint64_t*)pMeta->col_list; + xdb_col_t *pCol = (xdb_col_t*)((void*)pMeta + pMeta->cols_off); + while (pCol->col_len) { + *pColList++ = (uintptr_t)pCol; + pCol = (void*)pCol + pCol->col_len; + } + xdb_init_rowlist (pQueryRes); + xdb_fetch_rows (pRes); + pConn->ref_cnt++; + } + + return pRes; +} + +XDB_STATIC xdb_res_t* +xdb_exec_client (xdb_conn_t *pConn, const char *sql, int len) +{ + xdb_res_t *pRes = &pConn->conn_res; + if (xdb_unlikely (pConn->sockfd < 0)) { + int ret = xdb_reconnect (pConn); + XDB_EXPECT(ret == XDB_OK, XDB_E_SOCK, "Can't connect server"); + } + xdb_svrlog ("send: '%s'\n", sql); + // send socket + char buf[64]; + int nn = sprintf (buf, "$%d\n", len); + int wlen = xdb_sock_write (pConn->sockfd, buf, nn); + XDB_EXPECT_SOCK(wlen == nn, XDB_E_SOCK, "Socket Error write %d of %d", wlen, nn); + wlen = xdb_sock_write (pConn->sockfd, sql, len); + XDB_EXPECT_SOCK(wlen == len, XDB_E_SOCK, "Socket Error write %d of %d", wlen, len); + pRes = xdb_fetch_res_sock (pConn); + if ((XDB_STMT_USE_DB == pRes->stmt_type) && (0 == pRes->errcode) && pRes->row_data) { + xdb_strcpy (pConn->cur_db, (char*)pRes->row_data); + } + +error: + return pRes; +} + +// source | dump | shell | help +XDB_STATIC bool +xdb_is_local_stmt (const char *sql) +{ + switch (sql[0]) { + case 's': + case 'S': + switch (sql[1]) { + case 'o': + case 'O': + if (!strncasecmp (sql, "SOURCE", 6)) { + return true; + } + break; + case 'h': + case 'H': + if (!strncasecmp (sql, "SHELL", 5)) { + return true; + } + break; + } + break; + case 'd': + case 'D': + if (!strncasecmp (sql, "DUMP", 4)) { + return true; + } + break; + case 'h': + case 'H': + if (!strncasecmp (sql, "HELP", 4)) { + return true; + } + break; + } + return false; +} + diff --git a/src/server/xdb_server.c b/src/server/xdb_server.c new file mode 100644 index 0000000..a5007b8 --- /dev/null +++ b/src/server/xdb_server.c @@ -0,0 +1,293 @@ +/****************************************************************************** +* Copyright (c) 2024-present JC Wang. All rights reserved +* +* https://crossdb.org +* https://github.com/crossdb-org/crossdb +* +* This Source Code Form is subject to the terms of the Mozilla Public +* License, v. 2.0. If a copy of the MPL was not distributed with this +* file, You can obtain one at https://mozilla.org/MPL/2.0/. +******************************************************************************/ + +#ifndef xdb_svrlog +#if XDB_LOG_FLAGS & XDB_LOG_SVR +#define xdb_svrlog(...) xdb_print(__VA_ARGS__) +#else +#define xdb_svrlog(...) +#endif +#endif + +static xdb_objm_t s_xdb_svr_list; + +XDB_STATIC const char* +xdb_ip2str (uint32_t ip) +{ + static char idx = 0; + static char bufpool[8][16]; + char *buf = bufpool[idx++&7]; + sprintf (buf, "%d.%d.%d.%d", ip>>24, (ip>>16)&0xff, (ip>>8)&0xff, ip&0xff); + return buf; +} + +XDB_STATIC void* +xdb_handle_client (void *pArg) +{ + int len, rlen; + xdb_conn_t* pConn = pArg; + int clientfd = pConn->sockfd; + XDB_BUF_DEF(sqlbuf, 65536); + + xdb_sock_SetTcpNoDelay (pConn->sockfd, 1); + + while ((len = xdb_sock_read (clientfd, sqlbuf, sqlbuf_size - 1)) > 0) { + sqlbuf[len] = '\0'; + xdb_svrlog ("%d recv %d: %s\n", clientfd, len, sqlbuf); + char *sql = sqlbuf; + if ('$' == *sqlbuf) { + int slen = 0; + sql++; + while (*sql && *sql != '\n') { + slen = slen * 10 + (*sql++ - '0'); + } + if (*sql != '\n') { + break; + } + sql++; + rlen = len - ((void*)sql - (void*)sqlbuf); + if (rlen < slen) { + int nlen = slen - rlen; + int new_len = len + nlen + 1; + if (new_len >= sqlbuf_size) { + new_len = XDB_ALIGN4K(new_len); + uint32_t sqloff = sql - sqlbuf; + XDB_BUF_ALLOC(sqlbuf, new_len); + if (NULL == sqlbuf) { + break; + } + sql = sqlbuf + sqloff; + } + int nn = xdb_sock_read (clientfd, sqlbuf + len, nlen); + if (nn < nlen) { + break; + } + } + len = slen; + sql[len] = '\0'; + xdb_svrlog ("%d run %d: %s\n", clientfd, len, sql); + } else { + if ((!strncasecmp (sqlbuf, "exit", 4) || !strncasecmp (sqlbuf, "quit", 4)) && isspace(sqlbuf[4])) { + break; + } + rlen = 0; + if (!xdb_is_sql_complete (sqlbuf, false)) { + while ((rlen = xdb_sock_read (clientfd, sqlbuf + len, sqlbuf_size)) > 0) { + len += rlen; + sqlbuf[len] = '\0'; + if (xdb_is_sql_complete (sqlbuf, false)) { + break; + } + } + if (rlen <= 0) { + break; + } + + } + } + xdb_exec_out (pConn, sql, len); + } + + xdb_svrlog ("close %d\n", clientfd); + XDB_BUF_FREE(sqlbuf); + xdb_close (pConn); + return NULL; +} + +XDB_STATIC void* +xdb_run_server (void *pArg) +{ + int ret = -1; + xdb_server_t *pServer = pArg; + int port = pServer->svr_port; + + struct sockaddr_in serverin; + xdb_sockaddr_init (&serverin, port, "0.0.0.0"); + unsigned int sock_val = 1; + + int sockfd = xdb_sock_open (AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) { + xdb_errlog ("socket() error : %s", strerror(errno)); + ret = -1; + goto exit; + } + + setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_val, sizeof(sock_val)); +#if !defined(_WIN32) && !defined(__CYGWIN__) && !defined(__MSYS__) + setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&sock_val, sizeof(sock_val)); +#endif + + ret = bind(sockfd, (struct sockaddr*)&serverin, sizeof (serverin)); + if (ret < 0) { + xdb_errlog ("fd %d %s:%d bind() error %s", sockfd, xdb_ip2str(0), port, strerror(errno)); + goto exit; + } + + xdb_svrlog ("Run server %s:%d\n", XDB_OBJ_NAME(pServer), port); + pServer->sockfd = sockfd; + + while (!pServer->bDrop) { + ret = listen(sockfd, 5); + if (ret < 0) { + xdb_errlog ("fd %d %s:%d bind() listen() error %s", sockfd, xdb_ip2str(0), port, strerror(errno)); + goto exit;; + } + + struct sockaddr_in cliaddr_in; + socklen_t clilen = sizeof(cliaddr_in); + int clientfd; + clientfd = accept(sockfd, (struct sockaddr *)&cliaddr_in, &clilen); + if(clientfd < 0) { + xdb_errlog ("%d Can't accept: %s\n", sockfd, strerror(errno)); + continue; + } + if (pServer->bDrop) { + close (clientfd); + goto exit; + } + xdb_svrlog ("Accept new client %d %s:%d...\n", clientfd, xdb_ip2str(ntohl(cliaddr_in.sin_addr.s_addr)), ntohs(cliaddr_in.sin_port)); + + xdb_thread_t thread; + xdb_conn_t* pConn = xdb_open (NULL); + if (NULL != pConn) { + pConn->sockfd = clientfd; + pConn->conn_stdout = fdopen (pConn->sockfd, "w"); + if (NULL == pConn->conn_stdout) { + pConn->conn_stdout = (void*)(uintptr_t)pConn->sockfd; + } + pConn->pServer = pServer; + xdb_create_thread (&thread, NULL, xdb_handle_client, (void*)pConn); + } else { + xdb_sock_close (clientfd); + } + } + +exit: + + xdb_svrlog ("Exit server %s:%d\n", XDB_OBJ_NAME(pServer), port); + + xdb_free (pServer); + + return NULL; +} + +XDB_STATIC xdb_server_t* +xdb_find_server (const char *svr_name) +{ + return xdb_objm_get (&s_xdb_svr_list, svr_name); +} + +XDB_STATIC int +xdb_create_server (xdb_stmt_svr_t *pStmt) +{ + xdb_server_t *pSvr = xdb_find_server (pStmt->svr_name); + if (pStmt->bIfExistOrNot && (NULL != pSvr)) { + return XDB_OK; + } + + xdb_conn_t* pConn = pStmt->pConn; + xdb_server_t *pServer = xdb_calloc (sizeof(xdb_server_t)); + XDB_EXPECT (NULL != pServer, XDB_E_MEMORY, "Can't alloc memory"); + xdb_strcpy (XDB_OBJ_NAME(pServer), pStmt->svr_name); + pServer->svr_port = pStmt->svr_port; + + XDB_OBJ_ID(pServer) = -1; + xdb_objm_add (&s_xdb_svr_list, pServer); + + xdb_sysdb_add_svr (pServer); + + xdb_create_thread (&pServer->tid, NULL, xdb_run_server, pServer); + + return 0; + +error: + xdb_free (pServer); + return -pConn->conn_res.errcode; +} + +XDB_STATIC int +xdb_drop_server (xdb_stmt_svr_t *pStmt) +{ + xdb_server_t *pSvr = xdb_find_server (pStmt->svr_name); + if (pStmt->bIfExistOrNot && (NULL == pSvr)) { + return XDB_OK; + } + + xdb_sysdb_del_svr (pSvr); + + pSvr->bDrop = true; + + int sockfd = xdb_sock_open (AF_INET, SOCK_STREAM, 0); + if (sockfd >= 0) { + struct sockaddr_in serverin; + xdb_sockaddr_init (&serverin, pSvr->svr_port, "127.0.0.1"); + xdb_sock_connect(sockfd, (struct sockaddr*)&serverin, sizeof(serverin)); + xdb_sock_close (sockfd); + } + + return XDB_OK; +} + +XDB_STATIC void xdb_native_out (xdb_conn_t *pConn, xdb_res_t *pRes) +{ + uint64_t slen = (pRes->len_type & XDB_LEN_MASK) + pRes->data_len; + xdb_meta_t *pMeta = (xdb_meta_t*)pRes->col_meta; + memcpy (pRes + 1, pMeta, pRes->meta_len); + size_t len = xdb_sock_write (pConn->sockfd, pRes, slen); + if (len < slen) { + xdb_errlog ("send %"PRIi64" < %"PRIi64"\n", len, slen); + } + xdb_svrlog ("send response %d to client\n", len); +#if XDB_LOG_FLAGS & XDB_LOG_SVR + //xdb_hexdump (pRes, len); +#endif +} + +static char *s_xdb_banner_server = +" _____ _____ ____ _ \n" +" / ____| | __ \\| _ \\ _| |_ CrossDB Server v%s\n" +" | | _ __ ___ ___ ___| | | | |_) | |_ _| Port: %d\n" +" | | | '__/ _ \\/ __/ __| | | | _ < |_| DataDir: %s\n" +" | |____| | | (_) \\__ \\__ \\ |__| | |_) | \n" +" \\_____|_| \\___/|___/___/_____/|____/ https://crossdb.org\n\n"; + + +int +xdb_start_server (const char *host, uint16_t port, const char *datadir, bool bQuite) +{ + if (!bQuite) { + xdb_print (s_xdb_banner_server, xdb_version(), port, datadir); + } + + if (NULL == datadir) { + datadir = "/var/lib/crossdb"; + } + xdb_mkdir (datadir); + + xdb_conn_t *pConn = xdb_open (NULL); + + xdb_pexec (pConn, "OPEN DATADIR '%s'", datadir); + + xdb_pexec (pConn, "SET DATADIR='%s'", datadir); + + xdb_pexec (pConn, "CREATE SERVER crossdb PORT=%d", port); + + if (!bQuite) { + xdb_shell_loop (pConn, NULL, NULL, true); + } else { + while (1) { + sleep (1000); + } + } + + return XDB_OK; +} + diff --git a/src/server/xdb_server.h b/src/server/xdb_server.h new file mode 100644 index 0000000..aaf662e --- /dev/null +++ b/src/server/xdb_server.h @@ -0,0 +1,35 @@ +/****************************************************************************** +* Copyright (c) 2024-present JC Wang. All rights reserved +* +* https://crossdb.org +* https://github.com/crossdb-org/crossdb +* +* This Source Code Form is subject to the terms of the Mozilla Public +* License, v. 2.0. If a copy of the MPL was not distributed with this +* file, You can obtain one at https://mozilla.org/MPL/2.0/. +******************************************************************************/ + +#ifndef __XDB_SERVER_H__ +#define __XDB_SERVER_H__ + +#define XDB_SVR_PORT 7777 +#ifndef _WIN32 +#define XDB_DATA_DIR "/var/xdb_data" +#else +#define XDB_DATA_DIR "c:/crossdb/xdb_data" +#endif + +typedef struct xdb_server_t { + xdb_obj_t obj; + int svr_port; + int sockfd; + bool bDrop; + xdb_thread_t tid; +} xdb_server_t; + +#if (XDB_ENABLE_PUBSUB == 1) +XDB_STATIC int +xdb_pub_notify (const char *sql, int len); +#endif + +#endif // __XDB_SERVER_H__ diff --git a/src/xdb-cli.c b/src/xdb-cli.c index b8ef0b5..1b1b277 100644 --- a/src/xdb-cli.c +++ b/src/xdb-cli.c @@ -9,30 +9,30 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. ******************************************************************************/ -#define XDB_CLI #include "crossdb.c" int main (int argc, char **argv) { - char *db = ":memory:"; + char *db = NULL; char ch; - bool bServer = false, bNeedPasswd = false; + bool bServer = false, bNeedPasswd = false, bQuite = false; #if (XDB_ENABLE_SERVER == 1) char *datadir = NULL, *host = NULL, *user = NULL, *password = NULL; int port = 0; #endif char *esql = NULL; - while ((ch = getopt(argc, argv, "h:u:p:e:P:D:R:S")) != -1) { + while ((ch = getopt(argc, argv, "h:u:p:e:P:D:R:Sq")) != -1) { switch (ch) { case '?': xdb_print ("Usage: xdb-cli [OPTIONS] [[path]/db_name]\n"); xdb_print (" -? Show this help\n"); #if (XDB_ENABLE_SERVER == 1) - xdb_print (" -S Server: Start in server mode\n"); + xdb_print (" -S Server: Start in server mode, default port %d\n", XDB_SVR_PORT); xdb_print (" -h IP address to bind to or connect to\n"); xdb_print (" -P Port to listen or connect\n"); - xdb_print (" -D Server: Data directory to store databases\n"); + xdb_print (" -D Server: Data directory to store databases, default '%s'\n", XDB_DATA_DIR); + xdb_print (" -q Server: quite mode.\n"); xdb_print (" -u Client user\n"); xdb_print (" -p Client password\n"); #endif // XDB_ENABLE_SERVER @@ -50,9 +50,15 @@ int main (int argc, char **argv) break; case 'h': host = optarg; + if (0 == port) { + port = XDB_SVR_PORT; + } break; case 'P': port = atoi (optarg); + if (NULL == host) { + host = "127.0.0.1"; + } break; case 'D': datadir = optarg; @@ -67,14 +73,18 @@ int main (int argc, char **argv) case 'e': esql = optarg; break; + case 'q': + bQuite = true; + break; } } if (bServer) { #if (XDB_ENABLE_SERVER == 1) - xdb_start_server (host, port, datadir, NULL); + xdb_start_server (host, port, datadir, bQuite); #endif } else { + s_xdb_cli = true; if (bNeedPasswd) { xdb_print ("Enter password:\n"); } @@ -82,15 +92,15 @@ int main (int argc, char **argv) db = argv[argc-1]; } #if (XDB_ENABLE_SERVER == 1) - xdb_conn_t *pConn = xdb_connect (host, user, password, db, port); + xdb_conn_t *pConn = xdb_connect (host, user, password, (host || db) ? db : ":memory:", port); #else - xdb_conn_t *pConn = xdb_open (db); + xdb_conn_t *pConn = xdb_open (db ? db : ":memory:"); #endif if (NULL != pConn) { if (NULL != esql) { xdb_shell_process (pConn, esql); } else { - xdb_shell_loop (pConn, NULL, NULL); + xdb_shell_loop (pConn, NULL, NULL, bQuite); } xdb_close (pConn); } diff --git a/winbuild.cmd b/winbuild.cmd index 0cbc7af..fa27bd6 100644 --- a/winbuild.cmd +++ b/winbuild.cmd @@ -1,5 +1,5 @@ IF not exist build (mkdir build) gcc -o build/libcrossdb.dll src/crossdb.c -fPIC -shared -lws2_32 -lpthread -static -O2 -Wl,--out-implib,build/libcrossdb.lib gcc -o build/xdb-cli src/xdb-cli.c -lws2_32 -lpthread -static -O2 -copy /Y src\crossdb.h build\ +copy /Y include\crossdb.h build\ xcopy /ehiqy build\* c:\crossdb\ \ No newline at end of file