From 949ad8be04316e8fbbd8c275d9bf9e88cf19e32e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Fri, 27 Dec 2024 10:18:37 +0800 Subject: [PATCH] branch-3.0: [opt](inverted index)Optimize code to get rid of heap use after free #45745 (#46051) Cherry-picked from #45745 Co-authored-by: qiye --- .../inverted_index_compound_reader.cpp | 17 ++ .../inverted_index_compound_reader.h | 1 + .../inverted_index_fs_directory.cpp | 4 +- .../segment_v2/inverted_index_writer.cpp | 12 +- ...dex_compound_directory_fault_injection.out | 22 -- ..._compound_directory_fault_injection.groovy | 190 ++++++++++++------ 6 files changed, 157 insertions(+), 89 deletions(-) delete mode 100644 regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp index f1b2b0eaedd4fd..c30017cc8fe737 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp @@ -238,6 +238,9 @@ const char* DorisCompoundReader::getObjectName() const { } bool DorisCompoundReader::list(std::vector* names) const { + if (_closed || _entries == nullptr) { + _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed"); + } for (EntriesType::const_iterator i = _entries->begin(); i != _entries->end(); i++) { names->push_back(i->first); } @@ -245,6 +248,9 @@ bool DorisCompoundReader::list(std::vector* names) const { } bool DorisCompoundReader::fileExists(const char* name) const { + if (_closed || _entries == nullptr) { + _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed"); + } return _entries->exists((char*)name); } @@ -253,6 +259,9 @@ int64_t DorisCompoundReader::fileModified(const char* name) const { } int64_t DorisCompoundReader::fileLength(const char* name) const { + if (_closed || _entries == nullptr) { + _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed"); + } ReaderFileEntry* e = _entries->get((char*)name); if (e == nullptr) { char buf[CL_MAX_PATH + 30]; @@ -267,6 +276,10 @@ int64_t DorisCompoundReader::fileLength(const char* name) const { bool DorisCompoundReader::openInput(const char* name, std::unique_ptr& ret, CLuceneError& error, int32_t bufferSize) { + if (_closed || _entries == nullptr) { + error.set(CL_ERR_IO, "DorisCompoundReader is already closed"); + return false; + } lucene::store::IndexInput* tmp; bool success = openInput(name, tmp, error, bufferSize); if (success) { @@ -310,6 +323,10 @@ void DorisCompoundReader::close() { _CLDELETE(_stream) } if (_entries != nullptr) { + // The life cycle of _entries should be consistent with that of the DorisCompoundReader. + // DO NOT DELETE _entries here, it will be deleted in the destructor + // When directory is closed, all _entries are cleared. But the directory may be called in other places. + // If we delete the _entries object here, it will cause core dump. _entries->clear(); } if (_ram_dir) { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h index a30c39f8a2ffdd..1c7bc159b9ca09 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h @@ -67,6 +67,7 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory { private: lucene::store::RAMDirectory* _ram_dir = nullptr; CL_NS(store)::IndexInput* _stream = nullptr; + // The life cycle of _entries should be consistent with that of the DorisCompoundReader. EntriesType* _entries = nullptr; std::mutex _this_lock; bool _closed = false; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index 29caf29936dddf..4befeba8991042 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -845,10 +845,8 @@ bool DorisRAMFSDirectory::doDeleteFile(const char* name) { SCOPED_LOCK_MUTEX(this->THIS_LOCK); sizeInBytes -= itr->second->sizeInBytes; filesMap->removeitr(itr); - return true; - } else { - return false; } + return true; } bool DorisRAMFSDirectory::deleteDirectory() { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index 86a8f89e4c94e4..d85511722ec092 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -138,12 +138,14 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { try { DBUG_EXECUTE_IF("InvertedIndexColumnWriter::close_on_error_throw_exception", { _CLTHROWA(CL_ERR_IO, "debug point: close on error"); }) - if (_index_writer) { - _index_writer->close(); - } + // delete directory must be done before index_writer close + // because index_writer will close the directory if (_dir) { _dir->deleteDirectory(); } + if (_index_writer) { + _index_writer->close(); + } } catch (CLuceneError& e) { LOG(ERROR) << "InvertedIndexWriter close_on_error failure: " << e.what(); } @@ -692,12 +694,14 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { std::unique_ptr _doc = nullptr; lucene::document::Field* _field = nullptr; bool _single_field = true; + // Since _index_writer's write.lock is created by _dir.lockFactory, + // _dir must destruct after _index_writer, so _dir must be defined before _index_writer. + std::shared_ptr _dir = nullptr; std::unique_ptr _index_writer = nullptr; std::unique_ptr _analyzer = nullptr; std::unique_ptr _char_string_reader = nullptr; std::shared_ptr _bkd_writer = nullptr; InvertedIndexCtxSPtr _inverted_index_ctx = nullptr; - std::shared_ptr _dir = nullptr; const KeyCoder* _value_key_coder; const TabletIndex* _index_meta; InvertedIndexParserType _parser_type; diff --git a/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out b/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out deleted file mode 100644 index 4efc0928fb777a..00000000000000 --- a/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out +++ /dev/null @@ -1,22 +0,0 @@ --- This file is automatically generated. You should know what you did if you want to edit this --- !sql -- -863 - --- !sql -- -863 - --- !sql -- -863 - --- !sql -- -863 - --- !sql -- -0 - --- !sql -- -0 - --- !sql -- -0 - diff --git a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy index 4a7e3d45e90656..27f939dcbc0510 100644 --- a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy @@ -15,9 +15,35 @@ // specific language governing permissions and limitations // under the License. -suite("test_index_compound_directory_failure_injection", "nonConcurrent") { +suite("test_index_compound_directory_fault_injection", "nonConcurrent") { // define a sql table def testTable_dup = "httplogs_dup_compound" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def check_config = { String key, String value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == key) { + assertEquals(value, ((List) ele)[2]) + } + } + } + } def create_httplogs_dup_table = {testTablex -> // multi-line sql @@ -85,67 +111,111 @@ suite("test_index_compound_directory_failure_injection", "nonConcurrent") { } } - try { - sql "DROP TABLE IF EXISTS ${testTable_dup}" - create_httplogs_dup_table.call(testTable_dup) - - try { - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor") - load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor") - } - qt_sql "select COUNT() from ${testTable_dup} where request match 'images'" + def run_test = {String is_enable -> + boolean invertedIndexRAMDirEnable = false + boolean has_update_be_config = false try { - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close") - load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close") - } - qt_sql "select COUNT() from ${testTable_dup} where request match 'images'" - try { - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error") - load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error") - } - qt_sql "select COUNT() from ${testTable_dup} where request match 'images'" - try { - def test_index_compound_directory = "test_index_compound_directory1" - sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" - create_httplogs_dup_table.call(test_index_compound_directory) - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer") - load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') - qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'gif'" - try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") - } catch(Exception ex) { - logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " + ex) - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer") - } - try { - def test_index_compound_directory = "test_index_compound_directory2" - sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" - create_httplogs_dup_table.call(test_index_compound_directory) - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer") - load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') - qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'images'" - try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer") - } - try { - def test_index_compound_directory = "test_index_compound_directory3" - sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" - create_httplogs_dup_table.call(test_index_compound_directory) - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init") - load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') - qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'png'" - try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") + String backend_id; + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == "inverted_index_ram_dir_enable") { + invertedIndexRAMDirEnable = Boolean.parseBoolean(((List) ele)[2]) + logger.info("inverted_index_ram_dir_enable: ${((List) ele)[2]}") + } + } + set_be_config.call("inverted_index_ram_dir_enable", is_enable) + has_update_be_config = true + // check updated config + check_config.call("inverted_index_ram_dir_enable", is_enable); + + sql "DROP TABLE IF EXISTS ${testTable_dup}" + create_httplogs_dup_table.call(testTable_dup) + + try { + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor") + load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor") + } + def res = sql "select COUNT() from ${testTable_dup} where request match 'images'" + assertEquals(863, res[0][0]) + sql "TRUNCATE TABLE ${testTable_dup}" + + try { + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close") + load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close") + } + res = sql "select COUNT() from ${testTable_dup} where request match 'images'" + assertEquals(0, res[0][0]) + sql "TRUNCATE TABLE ${testTable_dup}" + + try { + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error") + load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error") + } + res = sql "select COUNT() from ${testTable_dup} where request match 'images'" + assertEquals(0, res[0][0]) + sql "TRUNCATE TABLE ${testTable_dup}" + + try { + def test_index_compound_directory = "test_index_compound_directory1" + sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" + create_httplogs_dup_table.call(test_index_compound_directory) + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer") + load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') + res = sql "select COUNT() from ${test_index_compound_directory} where request match 'gif'" + try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") + } catch(Exception ex) { + assertTrue(ex.toString().contains("failed to initialize storage reader")) + logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " + ex) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer") + } + + try { + def test_index_compound_directory = "test_index_compound_directory2" + sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" + create_httplogs_dup_table.call(test_index_compound_directory) + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer") + load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') + res = sql "select COUNT() from ${test_index_compound_directory} where request match 'images'" + assertEquals(0, res[0][0]) + try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer") + } + + try { + def test_index_compound_directory = "test_index_compound_directory3" + sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" + create_httplogs_dup_table.call(test_index_compound_directory) + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init") + load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') + res = sql "select COUNT() from ${test_index_compound_directory} where request match 'png'" + assertEquals(0, res[0][0]) + try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init") + } } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init") + if (has_update_be_config) { + set_be_config.call("inverted_index_ram_dir_enable", invertedIndexRAMDirEnable.toString()) + } } - } finally { - //try_sql("DROP TABLE IF EXISTS ${testTable}") } + + run_test.call("false") + run_test.call("true") }