diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp index 5e6d8747a2d7ed..08ab1b6cc889b1 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp @@ -214,6 +214,9 @@ const char* DorisCompoundReader::getObjectName() const { } bool DorisCompoundReader::list(std::vector* names) const { + if (_closed || entries == nullptr) { + _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed"); + } for (EntriesType::const_iterator i = entries->begin(); i != entries->end(); i++) { names->push_back(i->first); } @@ -221,6 +224,9 @@ bool DorisCompoundReader::list(std::vector* names) const { } bool DorisCompoundReader::fileExists(const char* name) const { + if (_closed || entries == nullptr) { + _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed"); + } return entries->exists((char*)name); } @@ -237,6 +243,9 @@ int64_t DorisCompoundReader::fileModified(const char* name) const { } int64_t DorisCompoundReader::fileLength(const char* name) const { + if (_closed || entries == nullptr) { + _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed"); + } ReaderFileEntry* e = entries->get((char*)name); if (e == nullptr) { char buf[CL_MAX_PATH + 30]; @@ -251,6 +260,9 @@ int64_t DorisCompoundReader::fileLength(const char* name) const { bool DorisCompoundReader::openInput(const char* name, std::unique_ptr& ret, CLuceneError& error, int32_t bufferSize) { + if (_closed || entries == nullptr) { + _CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed"); + } lucene::store::IndexInput* tmp; bool success = openInput(name, tmp, error, bufferSize); if (success) { @@ -294,6 +306,10 @@ void DorisCompoundReader::close() { _CLDELETE(stream) } if (entries != nullptr) { + // The life cycle of _entries should be consistent with that of the DorisCompoundReader. + // DO NOT DELETE _entries here, it will be deleted in the destructor + // When directory is closed, all _entries are cleared. But the directory may be called in other places. + // If we delete the _entries object here, it will cause core dump. entries->clear(); } if (ram_dir) { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h index 1ca2d6ad3718c0..bc5ae415052f4f 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h +++ b/be/src/olap/rowset/segment_v2/inverted_index_compound_reader.h @@ -72,6 +72,7 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory { std::string directory; std::string file_name; CL_NS(store)::IndexInput* stream = nullptr; + // The life cycle of entries should be consistent with that of the DorisCompoundReader. EntriesType* entries = nullptr; std::mutex _this_lock; bool _closed = false; diff --git a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp index 9dbe0986755fc1..9e2e253c40471f 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_fs_directory.cpp @@ -634,10 +634,8 @@ bool DorisRAMFSDirectory::doDeleteFile(const char* name) { SCOPED_LOCK_MUTEX(this->THIS_LOCK); sizeInBytes -= itr->second->sizeInBytes; filesMap->removeitr(itr); - return true; - } else { - return false; } + return true; } bool DorisRAMFSDirectory::deleteDirectory() { diff --git a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp index a50b34b5fb1872..4e503685e686b8 100644 --- a/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp +++ b/be/src/olap/rowset/segment_v2/inverted_index_writer.cpp @@ -138,12 +138,14 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { void close_on_error() override { try { - if (_index_writer) { - _index_writer->close(); - } + // delete directory must be done before index_writer close + // because index_writer will close the directory if (_dir) { _dir->deleteDirectory(); } + if (_index_writer) { + _index_writer->close(); + } } catch (CLuceneError& e) { LOG(ERROR) << "InvertedIndexWriter close_on_error failure: " << e.what(); } @@ -664,11 +666,13 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter { std::unique_ptr _doc = nullptr; lucene::document::Field* _field = nullptr; bool _single_field = true; + // Since _index_writer's write.lock is created by _dir.lockFactory, + // _dir must destruct after _index_writer, so _dir must be defined before _index_writer. + DorisFSDirectory* _dir = nullptr; std::unique_ptr _index_writer = nullptr; std::unique_ptr _analyzer = nullptr; std::unique_ptr _char_string_reader = nullptr; std::shared_ptr _bkd_writer = nullptr; - DorisFSDirectory* _dir = nullptr; const KeyCoder* _value_key_coder; const TabletIndex* _index_meta; InvertedIndexParserType _parser_type; diff --git a/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out b/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out deleted file mode 100644 index 4efc0928fb777a..00000000000000 --- a/regression-test/data/fault_injection_p0/test_index_compound_directory_fault_injection.out +++ /dev/null @@ -1,22 +0,0 @@ --- This file is automatically generated. You should know what you did if you want to edit this --- !sql -- -863 - --- !sql -- -863 - --- !sql -- -863 - --- !sql -- -863 - --- !sql -- -0 - --- !sql -- -0 - --- !sql -- -0 - diff --git a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy index 759c409a850727..97cd829ec79b38 100644 --- a/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_index_compound_directory_fault_injection.groovy @@ -15,9 +15,35 @@ // specific language governing permissions and limitations // under the License. -suite("test_index_compound_directory_failure_injection", "nonConcurrent") { +suite("test_index_compound_directory_fault_injection", "nonConcurrent") { // define a sql table def testTable_dup = "httplogs_dup_compound" + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + def set_be_config = { key, value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value) + logger.info("update config: code=" + code + ", out=" + out + ", err=" + err) + } + } + + def check_config = { String key, String value -> + for (String backend_id: backendId_to_backendIP.keySet()) { + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == key) { + assertEquals(value, ((List) ele)[2]) + } + } + } + } def create_httplogs_dup_table = {testTablex -> // multi-line sql @@ -85,74 +111,111 @@ suite("test_index_compound_directory_failure_injection", "nonConcurrent") { } } - try { - sql "DROP TABLE IF EXISTS ${testTable_dup}" - create_httplogs_dup_table.call(testTable_dup) - - try { - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor") - load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor") - } - qt_sql "select COUNT() from ${testTable_dup} where request match 'images'" - try { - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close") - load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close") - } - qt_sql "select COUNT() from ${testTable_dup} where request match 'images'" - try { - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error") - load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_finalize_status_error") - } - qt_sql "select COUNT() from ${testTable_dup} where request match 'images'" - try { - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error") - load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error") - } - qt_sql "select COUNT() from ${testTable_dup} where request match 'images'" - try { - def test_index_compound_directory = "test_index_compound_directory1" - sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" - create_httplogs_dup_table.call(test_index_compound_directory) - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer") - load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') - qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'gif'" - try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") - } catch(Exception ex) { - logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " + ex) - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer") - } - try { - def test_index_compound_directory = "test_index_compound_directory2" - sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" - create_httplogs_dup_table.call(test_index_compound_directory) - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer") - load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') - qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'images'" - try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") - } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer") - } + def run_test = {String is_enable -> + boolean invertedIndexRAMDirEnable = false + boolean has_update_be_config = false try { - def test_index_compound_directory = "test_index_compound_directory3" - sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" - create_httplogs_dup_table.call(test_index_compound_directory) - GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init") - load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') - qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'png'" - try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") + String backend_id; + backend_id = backendId_to_backendIP.keySet()[0] + def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) + + logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def configList = parseJson(out.trim()) + assert configList instanceof List + + for (Object ele in (List) configList) { + assert ele instanceof List + if (((List) ele)[0] == "inverted_index_ram_dir_enable") { + invertedIndexRAMDirEnable = Boolean.parseBoolean(((List) ele)[2]) + logger.info("inverted_index_ram_dir_enable: ${((List) ele)[2]}") + } + } + set_be_config.call("inverted_index_ram_dir_enable", is_enable) + has_update_be_config = true + // check updated config + check_config.call("inverted_index_ram_dir_enable", is_enable); + + sql "DROP TABLE IF EXISTS ${testTable_dup}" + create_httplogs_dup_table.call(testTable_dup) + + try { + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor") + load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor") + } + def res = sql "select COUNT() from ${testTable_dup} where request match 'images'" + assertEquals(863, res[0][0]) + sql "TRUNCATE TABLE ${testTable_dup}" + + try { + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close") + load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close") + } + res = sql "select COUNT() from ${testTable_dup} where request match 'images'" + assertEquals(0, res[0][0]) + sql "TRUNCATE TABLE ${testTable_dup}" + + try { + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error") + load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json') + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error") + } + res = sql "select COUNT() from ${testTable_dup} where request match 'images'" + assertEquals(0, res[0][0]) + sql "TRUNCATE TABLE ${testTable_dup}" + + try { + def test_index_compound_directory = "test_index_compound_directory1" + sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" + create_httplogs_dup_table.call(test_index_compound_directory) + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer") + load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') + res = sql "select COUNT() from ${test_index_compound_directory} where request match 'gif'" + try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") + } catch(Exception ex) { + assertTrue(ex.toString().contains("failed to initialize storage reader")) + logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " + ex) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer") + } + + try { + def test_index_compound_directory = "test_index_compound_directory2" + sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" + create_httplogs_dup_table.call(test_index_compound_directory) + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer") + load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') + res = sql "select COUNT() from ${test_index_compound_directory} where request match 'images'" + assertEquals(0, res[0][0]) + try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer") + } + + try { + def test_index_compound_directory = "test_index_compound_directory3" + sql "DROP TABLE IF EXISTS ${test_index_compound_directory}" + create_httplogs_dup_table.call(test_index_compound_directory) + GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init") + load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json') + res = sql "select COUNT() from ${test_index_compound_directory} where request match 'png'" + assertEquals(0, res[0][0]) + try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}") + } finally { + GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init") + } } finally { - GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init") + if (has_update_be_config) { + set_be_config.call("inverted_index_ram_dir_enable", invertedIndexRAMDirEnable.toString()) + } } - } finally { - //try_sql("DROP TABLE IF EXISTS ${testTable}") } + + run_test.call("false") + run_test.call("true") }