Skip to content

Commit

Permalink
[opt](inverted index)Optimize code to get rid of heap use after free (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
qidaye authored Dec 27, 2024
1 parent 69961a3 commit 4746e9e
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 96 deletions.
16 changes: 16 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,19 @@ const char* DorisCompoundReader::getObjectName() const {
}

bool DorisCompoundReader::list(std::vector<std::string>* names) const {
if (_closed || entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
for (EntriesType::const_iterator i = entries->begin(); i != entries->end(); i++) {
names->push_back(i->first);
}
return true;
}

bool DorisCompoundReader::fileExists(const char* name) const {
if (_closed || entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
return entries->exists((char*)name);
}

Expand All @@ -237,6 +243,9 @@ int64_t DorisCompoundReader::fileModified(const char* name) const {
}

int64_t DorisCompoundReader::fileLength(const char* name) const {
if (_closed || entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
ReaderFileEntry* e = entries->get((char*)name);
if (e == nullptr) {
char buf[CL_MAX_PATH + 30];
Expand All @@ -251,6 +260,9 @@ int64_t DorisCompoundReader::fileLength(const char* name) const {
bool DorisCompoundReader::openInput(const char* name,
std::unique_ptr<lucene::store::IndexInput>& ret,
CLuceneError& error, int32_t bufferSize) {
if (_closed || entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
lucene::store::IndexInput* tmp;
bool success = openInput(name, tmp, error, bufferSize);
if (success) {
Expand Down Expand Up @@ -294,6 +306,10 @@ void DorisCompoundReader::close() {
_CLDELETE(stream)
}
if (entries != nullptr) {
// The life cycle of _entries should be consistent with that of the DorisCompoundReader.
// DO NOT DELETE _entries here, it will be deleted in the destructor
// When directory is closed, all _entries are cleared. But the directory may be called in other places.
// If we delete the _entries object here, it will cause core dump.
entries->clear();
}
if (ram_dir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {
std::string directory;
std::string file_name;
CL_NS(store)::IndexInput* stream = nullptr;
// The life cycle of entries should be consistent with that of the DorisCompoundReader.
EntriesType* entries = nullptr;
std::mutex _this_lock;
bool _closed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,10 +634,8 @@ bool DorisRAMFSDirectory::doDeleteFile(const char* name) {
SCOPED_LOCK_MUTEX(this->THIS_LOCK);
sizeInBytes -= itr->second->sizeInBytes;
filesMap->removeitr(itr);
return true;
} else {
return false;
}
return true;
}

bool DorisRAMFSDirectory::deleteDirectory() {
Expand Down
12 changes: 8 additions & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,14 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {

void close_on_error() override {
try {
if (_index_writer) {
_index_writer->close();
}
// delete directory must be done before index_writer close
// because index_writer will close the directory
if (_dir) {
_dir->deleteDirectory();
}
if (_index_writer) {
_index_writer->close();
}
} catch (CLuceneError& e) {
LOG(ERROR) << "InvertedIndexWriter close_on_error failure: " << e.what();
}
Expand Down Expand Up @@ -664,11 +666,13 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
std::unique_ptr<lucene::document::Document> _doc = nullptr;
lucene::document::Field* _field = nullptr;
bool _single_field = true;
// Since _index_writer's write.lock is created by _dir.lockFactory,
// _dir must destruct after _index_writer, so _dir must be defined before _index_writer.
DorisFSDirectory* _dir = nullptr;
std::unique_ptr<lucene::index::IndexWriter> _index_writer = nullptr;
std::unique_ptr<lucene::analysis::Analyzer> _analyzer = nullptr;
std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer = nullptr;
DorisFSDirectory* _dir = nullptr;
const KeyCoder* _value_key_coder;
const TabletIndex* _index_meta;
InvertedIndexParserType _parser_type;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,35 @@
// specific language governing permissions and limitations
// under the License.

suite("test_index_compound_directory_failure_injection", "nonConcurrent") {
suite("test_index_compound_directory_fault_injection", "nonConcurrent") {
// define a sql table
def testTable_dup = "httplogs_dup_compound"
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

def set_be_config = { key, value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
}

def check_config = { String key, String value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == key) {
assertEquals(value, ((List<String>) ele)[2])
}
}
}
}

def create_httplogs_dup_table = {testTablex ->
// multi-line sql
Expand Down Expand Up @@ -85,74 +111,111 @@ suite("test_index_compound_directory_failure_injection", "nonConcurrent") {
}
}

try {
sql "DROP TABLE IF EXISTS ${testTable_dup}"
create_httplogs_dup_table.call(testTable_dup)

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
def test_index_compound_directory = "test_index_compound_directory1"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'gif'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} catch(Exception ex) {
logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " + ex)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
}
try {
def test_index_compound_directory = "test_index_compound_directory2"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'images'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
}
def run_test = {String is_enable ->
boolean invertedIndexRAMDirEnable = false
boolean has_update_be_config = false
try {
def test_index_compound_directory = "test_index_compound_directory3"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'png'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
String backend_id;
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))

logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "inverted_index_ram_dir_enable") {
invertedIndexRAMDirEnable = Boolean.parseBoolean(((List<String>) ele)[2])
logger.info("inverted_index_ram_dir_enable: ${((List<String>) ele)[2]}")
}
}
set_be_config.call("inverted_index_ram_dir_enable", is_enable)
has_update_be_config = true
// check updated config
check_config.call("inverted_index_ram_dir_enable", is_enable);

sql "DROP TABLE IF EXISTS ${testTable_dup}"
create_httplogs_dup_table.call(testTable_dup)

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
}
def res = sql "select COUNT() from ${testTable_dup} where request match 'images'"
assertEquals(863, res[0][0])
sql "TRUNCATE TABLE ${testTable_dup}"

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
}
res = sql "select COUNT() from ${testTable_dup} where request match 'images'"
assertEquals(0, res[0][0])
sql "TRUNCATE TABLE ${testTable_dup}"

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
}
res = sql "select COUNT() from ${testTable_dup} where request match 'images'"
assertEquals(0, res[0][0])
sql "TRUNCATE TABLE ${testTable_dup}"

try {
def test_index_compound_directory = "test_index_compound_directory1"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
res = sql "select COUNT() from ${test_index_compound_directory} where request match 'gif'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} catch(Exception ex) {
assertTrue(ex.toString().contains("failed to initialize storage reader"))
logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " + ex)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
}

try {
def test_index_compound_directory = "test_index_compound_directory2"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
res = sql "select COUNT() from ${test_index_compound_directory} where request match 'images'"
assertEquals(0, res[0][0])
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
}

try {
def test_index_compound_directory = "test_index_compound_directory3"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
res = sql "select COUNT() from ${test_index_compound_directory} where request match 'png'"
assertEquals(0, res[0][0])
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
}
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
if (has_update_be_config) {
set_be_config.call("inverted_index_ram_dir_enable", invertedIndexRAMDirEnable.toString())
}
}
} finally {
//try_sql("DROP TABLE IF EXISTS ${testTable}")
}

run_test.call("false")
run_test.call("true")
}

0 comments on commit 4746e9e

Please sign in to comment.