Skip to content

Commit ae279e6

Browse files
committed
Implement flush_all feature
1 parent c4be8c4 commit ae279e6

21 files changed

+288
-50
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ venv/
1414
bench_venv/
1515
_client.cpp
1616
_client.so
17+
_client.*.so
1718
.tox/
1819

1920
.cache/

ext/gtest/CMakeLists.txt.in

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ include(ExternalProject)
55

66
ExternalProject_Add(googletest
77
GIT_REPOSITORY https://github.com/google/googletest.git
8-
GIT_TAG master
8+
GIT_TAG release-1.8.0
99
SOURCE_DIR "${CMAKE_BINARY_DIR}/googletest-src"
1010
BINARY_DIR "${CMAKE_BINARY_DIR}/googletest-build"
1111
CMAKE_ARGS -DCMAKE_ARCHIVE_OUTPUT_DIRECTORY_DEBUG:PATH=DebugLibs

include/Client.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ DECL_RETRIEVAL_CMD(gets)
5050
err_code_t version(broadcast_result_t** results, size_t* nHosts);
5151
err_code_t quit();
5252
err_code_t stats(broadcast_result_t** results, size_t* nHosts);
53+
err_code_t flushAll(broadcast_result_t** results, size_t* nHosts);
5354

5455
// touch
5556
err_code_t touch(const char* const* keys, const size_t* keyLens,
@@ -65,18 +66,23 @@ DECL_RETRIEVAL_CMD(gets)
6566
const bool noreply,
6667
unsigned_result_t** result, size_t* nResults);
6768

69+
inline void toggleFlushAllFeature(bool enabled) {
70+
m_flushAllEnabled = enabled;
71+
}
6872
void _sleep(uint32_t seconds); // check GIL in Python
6973

7074
protected:
7175
void collectRetrievalResult(retrieval_result_t*** results, size_t* nResults);
7276
void collectMessageResult(message_result_t*** results, size_t* nResults);
73-
void collectBroadcastResult(broadcast_result_t** results, size_t* nHosts);
77+
void collectBroadcastResult(broadcast_result_t** results, size_t* nHosts, bool isFlushAll=false);
7478
void collectUnsignedResult(unsigned_result_t** results, size_t* nResults);
7579

7680
std::vector<retrieval_result_t*> m_outRetrievalResultPtrs;
7781
std::vector<message_result_t*> m_outMessageResultPtrs;
7882
std::vector<broadcast_result_t> m_outBroadcastResultPtrs;
7983
std::vector<unsigned_result_t*> m_outUnsignedResultPtrs;
84+
85+
bool m_flushAllEnabled;
8086
};
8187

8288
} // namespace mc

include/ConnectionPool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class ConnectionPool {
4141

4242
void collectRetrievalResult(std::vector<retrieval_result_t*>& results);
4343
void collectMessageResult(std::vector<message_result_t*>& results);
44-
void collectBroadcastResult(std::vector<broadcast_result_t>& results);
44+
void collectBroadcastResult(std::vector<broadcast_result_t>& results, bool isFlushAll=false);
4545
void collectUnsignedResult(std::vector<unsigned_result_t*>& results);
4646
void reset();
4747
void setPollTimeout(int timeout);

include/Export.h

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,6 @@ typedef uint32_t flags_t;
4444
typedef uint64_t cas_unique_t;
4545

4646

47-
typedef struct {
48-
char* host;
49-
char** lines;
50-
size_t* line_lens;
51-
size_t len;
52-
} broadcast_result_t;
53-
54-
5547
typedef struct {
5648
char* key; // 8B
5749
char* data_block; // 8B
@@ -63,7 +55,8 @@ typedef struct {
6355

6456

6557
enum message_result_type {
66-
MSG_EXISTS,
58+
MSG_LIBMC_INVALID = -1,
59+
MSG_EXISTS = 0,
6760
MSG_OK,
6861
MSG_STORED,
6962
MSG_NOT_STORED,
@@ -85,3 +78,16 @@ typedef struct {
8578
size_t key_len;
8679
uint64_t value;
8780
} unsigned_result_t;
81+
82+
83+
// For flush_all command, we need to specify
84+
// {host} and {msg_type},
85+
// for other broadcast commands, we need to specify
86+
// all fields except {msg_type}
87+
typedef struct {
88+
char* host;
89+
char** lines;
90+
size_t* line_lens;
91+
size_t len;
92+
enum message_result_type msg_type; // for flush_all command
93+
} broadcast_result_t;

include/Keywords.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ static const char k_NOREPLY[] = " noreply";
3131

3232
static const char kVERSION[] = "version";
3333
static const char kSTATS[] = "stats";
34+
static const char kFLUSHALL[] = "flush_all";
3435

3536
static const char kQUIT[] = "quit";
3637

include/c_client.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ extern "C" {
6363
void client_destroy_unsigned_result(void* client);
6464

6565
err_code_t client_stats(void* client, broadcast_result_t** results, size_t* n_servers);
66+
void client_toggle_flush_all_feature(void* client, bool enabled);
67+
err_code_t client_flush_all(void* client, broadcast_result_t** results, size_t* n_servers);
6668
err_code_t client_quit(void* client);
6769

6870
const char* err_code_to_string(err_code_t err);

libmc/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@
2828
)
2929

3030
__VERSION__ = "1.2.0"
31-
__version__ = "v1.2.1"
31+
__version__ = "v1.2.1-4-gc3c6500"
3232
__author__ = "mckelvin"
3333
__email__ = "[email protected]"
34-
__date__ = "Wed Mar 13 14:40:14 2019 +0800"
34+
__date__ = "Tue Jul 30 14:06:44 2019 +0800"
3535

3636

3737
class Client(PyClient):

libmc/_client.pyx

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,13 +72,8 @@ cdef extern from "Export.h":
7272
uint32_t bytes
7373
cas_unique_t cas_unique
7474

75-
ctypedef struct broadcast_result_t:
76-
char* host
77-
char** lines
78-
size_t* line_lens
79-
size_t len
80-
8175
ctypedef enum message_result_type:
76+
MSG_LIBMC_INVALID
8277
MSG_EXISTS
8378
MSG_OK
8479
MSG_STORED
@@ -109,6 +104,13 @@ cdef extern from "Export.h":
109104
size_t key_len
110105
uint64_t value
111106

107+
ctypedef struct broadcast_result_t:
108+
char* host
109+
char** lines
110+
size_t* line_lens
111+
size_t len
112+
message_result_type msg_type;
113+
112114

113115
cdef extern from "Client.h" namespace "douban::mc":
114116
cdef cppclass Client:
@@ -189,6 +191,8 @@ cdef extern from "Client.h" namespace "douban::mc":
189191
err_code_t version(broadcast_result_t** results, size_t* nHosts) nogil
190192
err_code_t quit() nogil
191193
err_code_t stats(broadcast_result_t** results, size_t* nHosts) nogil
194+
err_code_t flushAll(broadcast_result_t** results, size_t* nHosts) nogil
195+
void toggleFlushAllFeature(bool_t enabled)
192196
void destroyBroadcastResult() nogil
193197

194198
err_code_t incr(
@@ -952,6 +956,30 @@ cdef class PyClient:
952956
self._imp.destroyBroadcastResult()
953957
return rv
954958

959+
def toggle_flush_all_feature(self, enabled):
960+
self._imp.toggleFlushAllFeature(enabled)
961+
962+
def flush_all(self):
963+
self._record_thread_ident()
964+
cdef broadcast_result_t* rst = NULL
965+
cdef size_t n = 0
966+
with nogil:
967+
self.last_error = self._imp.flushAll(&rst, &n)
968+
969+
rv = []
970+
for i in range(n):
971+
if rst[i].msg_type == MSG_OK:
972+
rv.append(rst[i].host)
973+
974+
with nogil:
975+
self._imp.destroyBroadcastResult()
976+
if self.last_error == RET_PROGRAMMING_ERR:
977+
raise RuntimeError(
978+
"Please call toggle_flush_all_feature(true) first "
979+
"to enable the flush_all feature."
980+
)
981+
return rv
982+
955983
def quit(self):
956984
self._record_thread_ident()
957985
with nogil:

misc/update_version.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ for VERSIONING_FILE in $VERSIONING_FILES
77
do
88
TMPFILE=$VERSIONING_FILE".2"
99
cat $VERSIONING_FILE | \
10-
python $VERSIONING_SCRIPT --clean | \
11-
python $VERSIONING_SCRIPT > $TMPFILE
10+
python2 $VERSIONING_SCRIPT --clean | \
11+
python2 $VERSIONING_SCRIPT > $TMPFILE
1212
mv $TMPFILE $VERSIONING_FILE
1313
git add $VERSIONING_FILE
1414
done

0 commit comments

Comments
 (0)