Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[opt](inverted index)Optimize code to get rid of heap use after free #45745

Merged
merged 2 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions be/src/olap/rowset/segment_v2/inverted_index_compound_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,19 @@ const char* DorisCompoundReader::getObjectName() const {
}

bool DorisCompoundReader::list(std::vector<std::string>* names) const {
if (_closed || _entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
for (EntriesType::const_iterator i = _entries->begin(); i != _entries->end(); i++) {
names->push_back(i->first);
}
return true;
}

bool DorisCompoundReader::fileExists(const char* name) const {
if (_closed || _entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
return _entries->exists((char*)name);
}

Expand All @@ -253,6 +259,9 @@ int64_t DorisCompoundReader::fileModified(const char* name) const {
}

int64_t DorisCompoundReader::fileLength(const char* name) const {
if (_closed || _entries == nullptr) {
_CLTHROWA(CL_ERR_IO, "DorisCompoundReader is already closed");
}
ReaderFileEntry* e = _entries->get((char*)name);
if (e == nullptr) {
char buf[CL_MAX_PATH + 30];
Expand All @@ -267,6 +276,10 @@ int64_t DorisCompoundReader::fileLength(const char* name) const {
bool DorisCompoundReader::openInput(const char* name,
std::unique_ptr<lucene::store::IndexInput>& ret,
CLuceneError& error, int32_t bufferSize) {
if (_closed || _entries == nullptr) {
error.set(CL_ERR_IO, "DorisCompoundReader is already closed");
return false;
}
lucene::store::IndexInput* tmp;
bool success = openInput(name, tmp, error, bufferSize);
if (success) {
Expand Down Expand Up @@ -310,6 +323,10 @@ void DorisCompoundReader::close() {
_CLDELETE(_stream)
}
if (_entries != nullptr) {
// The life cycle of _entries should be consistent with that of the DorisCompoundReader.
// DO NOT DELETE _entries here, it will be deleted in the destructor
// When directory is closed, all _entries are cleared. But the directory may be called in other places.
// If we delete the _entries object here, it will cause core dump.
_entries->clear();
}
if (_ram_dir) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class CLUCENE_EXPORT DorisCompoundReader : public lucene::store::Directory {
private:
lucene::store::RAMDirectory* _ram_dir = nullptr;
CL_NS(store)::IndexInput* _stream = nullptr;
// The life cycle of _entries should be consistent with that of the DorisCompoundReader.
EntriesType* _entries = nullptr;
std::mutex _this_lock;
bool _closed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,10 +812,8 @@ bool DorisRAMFSDirectory::doDeleteFile(const char* name) {
SCOPED_LOCK_MUTEX(this->THIS_LOCK);
sizeInBytes -= itr->second->sizeInBytes;
filesMap->removeitr(itr);
return true;
} else {
return false;
}
return true;
}

bool DorisRAMFSDirectory::deleteDirectory() {
Expand Down
12 changes: 8 additions & 4 deletions be/src/olap/rowset/segment_v2/inverted_index_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,14 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
try {
DBUG_EXECUTE_IF("InvertedIndexColumnWriter::close_on_error_throw_exception",
{ _CLTHROWA(CL_ERR_IO, "debug point: close on error"); })
if (_index_writer) {
_index_writer->close();
}
// delete directory must be done before index_writer close
// because index_writer will close the directory
if (_dir) {
_dir->deleteDirectory();
}
if (_index_writer) {
_index_writer->close();
}
} catch (CLuceneError& e) {
LOG(ERROR) << "InvertedIndexWriter close_on_error failure: " << e.what();
}
Expand Down Expand Up @@ -714,12 +716,14 @@ class InvertedIndexColumnWriterImpl : public InvertedIndexColumnWriter {
std::unique_ptr<lucene::document::Document> _doc = nullptr;
lucene::document::Field* _field = nullptr;
bool _single_field = true;
// Since _index_writer's write.lock is created by _dir.lockFactory,
// _dir must destruct after _index_writer, so _dir must be defined before _index_writer.
std::shared_ptr<DorisFSDirectory> _dir = nullptr;
std::unique_ptr<lucene::index::IndexWriter> _index_writer = nullptr;
std::unique_ptr<lucene::analysis::Analyzer> _analyzer = nullptr;
std::unique_ptr<lucene::util::Reader> _char_string_reader = nullptr;
std::shared_ptr<lucene::util::bkd::bkd_writer> _bkd_writer = nullptr;
InvertedIndexCtxSPtr _inverted_index_ctx = nullptr;
std::shared_ptr<DorisFSDirectory> _dir = nullptr;
const KeyCoder* _value_key_coder;
const TabletIndex* _index_meta;
InvertedIndexParserType _parser_type;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,35 @@
// specific language governing permissions and limitations
// under the License.

suite("test_index_compound_directory_failure_injection", "nonConcurrent") {
suite("test_index_compound_directory_fault_injection", "nonConcurrent") {
// define a sql table
def testTable_dup = "httplogs_dup_compound"
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);

def set_be_config = { key, value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
logger.info("update config: code=" + code + ", out=" + out + ", err=" + err)
}
}

def check_config = { String key, String value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List
for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == key) {
assertEquals(value, ((List<String>) ele)[2])
}
}
}
}

def create_httplogs_dup_table = {testTablex ->
// multi-line sql
Expand Down Expand Up @@ -85,67 +111,111 @@ suite("test_index_compound_directory_failure_injection", "nonConcurrent") {
}
}

try {
sql "DROP TABLE IF EXISTS ${testTable_dup}"
create_httplogs_dup_table.call(testTable_dup)

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
def run_test = {String is_enable ->
boolean invertedIndexRAMDirEnable = false
boolean has_update_be_config = false
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
}
qt_sql "select COUNT() from ${testTable_dup} where request match 'images'"
try {
def test_index_compound_directory = "test_index_compound_directory1"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'gif'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} catch(Exception ex) {
logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " + ex)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
}
try {
def test_index_compound_directory = "test_index_compound_directory2"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'images'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
}
try {
def test_index_compound_directory = "test_index_compound_directory3"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
qt_sql "select COUNT() from ${test_index_compound_directory} where request match 'png'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
String backend_id;
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))

logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
assert configList instanceof List

for (Object ele in (List) configList) {
assert ele instanceof List<String>
if (((List<String>) ele)[0] == "inverted_index_ram_dir_enable") {
invertedIndexRAMDirEnable = Boolean.parseBoolean(((List<String>) ele)[2])
logger.info("inverted_index_ram_dir_enable: ${((List<String>) ele)[2]}")
}
}
set_be_config.call("inverted_index_ram_dir_enable", is_enable)
has_update_be_config = true
// check updated config
check_config.call("inverted_index_ram_dir_enable", is_enable);

sql "DROP TABLE IF EXISTS ${testTable_dup}"
create_httplogs_dup_table.call(testTable_dup)

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_destructor")
}
def res = sql "select COUNT() from ${testTable_dup} where request match 'images'"
assertEquals(863, res[0][0])
sql "TRUNCATE TABLE ${testTable_dup}"

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_bufferedindexoutput_close")
}
res = sql "select COUNT() from ${testTable_dup} where request match 'images'"
assertEquals(0, res[0][0])
sql "TRUNCATE TABLE ${testTable_dup}"

try {
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
load_httplogs_data.call(testTable_dup, 'test_index_compound_directory', 'true', 'json', 'documents-1000.json')
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._set_writer_close_status_error")
}
res = sql "select COUNT() from ${testTable_dup} where request match 'images'"
assertEquals(0, res[0][0])
sql "TRUNCATE TABLE ${testTable_dup}"

try {
def test_index_compound_directory = "test_index_compound_directory1"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
res = sql "select COUNT() from ${test_index_compound_directory} where request match 'gif'"
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} catch(Exception ex) {
assertTrue(ex.toString().contains("failed to initialize storage reader"))
logger.info("_mock_append_data_error_in_fsindexoutput_flushBuffer, result: " + ex)
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._mock_append_data_error_in_fsindexoutput_flushBuffer")
}

try {
def test_index_compound_directory = "test_index_compound_directory2"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
res = sql "select COUNT() from ${test_index_compound_directory} where request match 'images'"
assertEquals(0, res[0][0])
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._status_error_in_fsindexoutput_flushBuffer")
}

try {
def test_index_compound_directory = "test_index_compound_directory3"
sql "DROP TABLE IF EXISTS ${test_index_compound_directory}"
create_httplogs_dup_table.call(test_index_compound_directory)
GetDebugPoint().enableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
load_httplogs_data.call(test_index_compound_directory, test_index_compound_directory, 'true', 'json', 'documents-1000.json')
res = sql "select COUNT() from ${test_index_compound_directory} where request match 'png'"
assertEquals(0, res[0][0])
try_sql("DROP TABLE IF EXISTS ${test_index_compound_directory}")
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
}
} finally {
GetDebugPoint().disableDebugPointForAllBEs("DorisFSDirectory::FSIndexOutput._throw_clucene_error_in_fsindexoutput_init")
if (has_update_be_config) {
set_be_config.call("inverted_index_ram_dir_enable", invertedIndexRAMDirEnable.toString())
}
}
} finally {
//try_sql("DROP TABLE IF EXISTS ${testTable}")
}

run_test.call("false")
run_test.call("true")
}
Loading