From 3dae8069f68bce45a23118afa9e426ffc5913335 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Mon, 14 Oct 2024 20:52:07 -0400 Subject: [PATCH 1/8] Refactor StreamReader to modularize decoding logic. --- CMakeLists.txt | 5 +- src/clp_ffi_js/ir/StreamReader.cpp | 77 +++++++++++--------------- src/clp_ffi_js/ir/StreamReader.hpp | 19 +++++-- src/clp_ffi_js/ir/decoding_methods.cpp | 36 ++++++++++++ src/clp_ffi_js/ir/decoding_methods.hpp | 10 ++++ 5 files changed, 96 insertions(+), 51 deletions(-) create mode 100644 src/clp_ffi_js/ir/decoding_methods.cpp create mode 100644 src/clp_ffi_js/ir/decoding_methods.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 753dee44..80568fe2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,7 +112,10 @@ target_include_directories( target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/) -set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp) +set(CLP_FFI_JS_SRC_MAIN + src/clp_ffi_js/ir/decoding_methods.cpp + src/clp_ffi_js/ir/StreamReader.cpp +) set(CLP_FFI_JS_SRC_CLP_CORE src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index b9c86b6b..a4a01f20 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -15,7 +15,6 @@ #include #include -#include #include #include #include @@ -27,6 +26,7 @@ #include #include +#include #include #include @@ -48,52 +48,10 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader { auto zstd_decompressor{std::make_unique()}; zstd_decompressor->open(data_buffer.data(), length); - bool is_four_bytes_encoding{true}; - if (auto const err{ - clp::ffi::ir_stream::get_encoding_type(*zstd_decompressor, is_four_bytes_encoding) - }; - clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err) - { - SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err); - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_MetadataCorrupted, - __FILENAME__, - __LINE__, - "Failed to decode encoding type." - }; - } - if (false == is_four_bytes_encoding) { - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Unsupported, - __FILENAME__, - __LINE__, - "IR stream uses unsupported encoding." - }; - } - - auto result{ - clp::ir::LogEventDeserializer::create(*zstd_decompressor) - }; - if (result.has_error()) { - auto const error_code{result.error()}; - SPDLOG_CRITICAL( - "Failed to create deserializer: {}:{}", - error_code.category().name(), - error_code.message() - ); - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Failure, - __FILENAME__, - __LINE__, - "Failed to create deserializer" - }; - } - - StreamReaderDataContext stream_reader_data_context{ - std::move(data_buffer), + auto stream_reader_data_context{create_deserializer_and_data_context( std::move(zstd_decompressor), - std::move(result.value()) - }; + std::move(data_buffer) + )}; return StreamReader{std::move(stream_reader_data_context)}; } @@ -251,6 +209,33 @@ StreamReader::StreamReader( std::move(stream_reader_data_context) )}, m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} + +auto StreamReader::create_deserializer_and_data_context( + std::unique_ptr&& zstd_decompressor, + clp::Array&& data_buffer +) -> StreamReaderDataContext { + rewind_reader_and_verify_encoding_type(*zstd_decompressor); + + auto result{ + clp::ir::LogEventDeserializer::create(*zstd_decompressor) + }; + if (result.has_error()) { + auto const error_code{result.error()}; + SPDLOG_CRITICAL( + "Failed to create deserializer: {}:{}", + error_code.category().name(), + error_code.message() + ); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + "Failed to create deserializer" + }; + } + + return {std::move(data_buffer), std::move(zstd_decompressor), std::move(result.value())}; +} } // namespace clp_ffi_js::ir namespace { diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index dec6c360..50fecb4c 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -1,9 +1,11 @@ #ifndef CLP_FFI_JS_IR_STREAM_READER_HPP #define CLP_FFI_JS_IR_STREAM_READER_HPP +#include #include #include #include +#include #include #include @@ -14,6 +16,8 @@ #include #include +using clp::ir::four_byte_encoded_variable_t; + namespace clp_ffi_js::ir { EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); @@ -97,12 +101,19 @@ class StreamReader { private: // Constructor - explicit StreamReader(StreamReaderDataContext&& - stream_reader_data_context); + explicit StreamReader( + StreamReaderDataContext&& stream_reader_data_context + ); + + // Methods + [[nodiscard]] static auto create_deserializer_and_data_context( + std::unique_ptr&& zstd_decompressor, + clp::Array&& data_buffer + ) -> StreamReaderDataContext; // Variables - std::vector> m_encoded_log_events; - std::unique_ptr> + std::vector> m_encoded_log_events; + std::unique_ptr> m_stream_reader_data_context; FilteredLogEventsMap m_filtered_log_event_map; clp::TimestampPattern m_ts_pattern; diff --git a/src/clp_ffi_js/ir/decoding_methods.cpp b/src/clp_ffi_js/ir/decoding_methods.cpp new file mode 100644 index 00000000..4d59bf55 --- /dev/null +++ b/src/clp_ffi_js/ir/decoding_methods.cpp @@ -0,0 +1,36 @@ +#include "decoding_methods.hpp" + +#include +#include +#include +#include +#include + +#include + +namespace clp_ffi_js::ir { +auto rewind_reader_and_verify_encoding_type(clp::ReaderInterface& reader) -> void { + reader.seek_from_begin(0); + + bool is_four_bytes_encoding{true}; + if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)}; + clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err) + { + SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_MetadataCorrupted, + __FILENAME__, + __LINE__, + "Failed to decode encoding type." + }; + } + if (false == is_four_bytes_encoding) { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Unsupported, + __FILENAME__, + __LINE__, + "IR stream uses unsupported encoding." + }; + } +} +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/decoding_methods.hpp b/src/clp_ffi_js/ir/decoding_methods.hpp new file mode 100644 index 00000000..9e24280d --- /dev/null +++ b/src/clp_ffi_js/ir/decoding_methods.hpp @@ -0,0 +1,10 @@ +#ifndef CLP_FFI_JS_IR_DECODING_METHODS_HPP +#define CLP_FFI_JS_IR_DECODING_METHODS_HPP + +#include + +namespace clp_ffi_js::ir { +auto rewind_reader_and_verify_encoding_type(clp::ReaderInterface& reader) -> void; +} // namespace clp_ffi_js::ir + +#endif // CLP_FFI_JS_IR_DECODING_METHODS_HPP From ab32045c2cdada8a4c2f0b64bb58c998afdb41a3 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Mon, 14 Oct 2024 20:59:17 -0400 Subject: [PATCH 2/8] Add a comment section for methods in StreamReader.hpp. --- src/clp_ffi_js/ir/StreamReader.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index 50fecb4c..8cc84469 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -56,6 +56,7 @@ class StreamReader { // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. auto operator=(StreamReader&&) -> StreamReader& = delete; + // Methods /** * @return The number of events buffered. */ From 8dc3dff0290ad1edac822f3e18410ddc453ce8c6 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Mon, 14 Oct 2024 21:10:29 -0400 Subject: [PATCH 3/8] Add `get_version` to decoding methods. --- src/clp_ffi_js/ir/decoding_methods.cpp | 45 ++++++++++++++++++++++++++ src/clp_ffi_js/ir/decoding_methods.hpp | 3 ++ 2 files changed, 48 insertions(+) diff --git a/src/clp_ffi_js/ir/decoding_methods.cpp b/src/clp_ffi_js/ir/decoding_methods.cpp index 4d59bf55..81063f6a 100644 --- a/src/clp_ffi_js/ir/decoding_methods.cpp +++ b/src/clp_ffi_js/ir/decoding_methods.cpp @@ -1,14 +1,59 @@ #include "decoding_methods.hpp" +#include +#include +#include +#include +#include + #include #include +#include #include #include +#include #include #include namespace clp_ffi_js::ir { +auto get_version(clp::ReaderInterface& reader) -> std::string { + // The encoding type bytes must be consumed before the metadata can be read. + rewind_reader_and_verify_encoding_type(reader); + + // Deserialize metadata bytes from preamble. + clp::ffi::ir_stream::encoded_tag_t metadata_type{}; + std::vector metadata_bytes; + auto const deserialize_preamble_result{ + clp::ffi::ir_stream::deserialize_preamble(reader, metadata_type, metadata_bytes) + }; + if (clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != deserialize_preamble_result) { + SPDLOG_CRITICAL( + "Failed to deserialize preamble for version reading: {}", + deserialize_preamble_result + ); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + "Failed to deserialize preamble for version reading." + }; + } + + // Deserialize metadata bytes which is encoded in JSON. + std::string_view const metadata_view{ + clp::size_checked_pointer_cast(metadata_bytes.data()), + metadata_bytes.size() + }; + nlohmann::json const metadata = nlohmann::json::parse(metadata_view); + + // Retrieve version from metadata. + auto const& version{metadata.at(clp::ffi::ir_stream::cProtocol::Metadata::VersionKey)}; + SPDLOG_INFO("The version is {}", version); + + return version; +} + auto rewind_reader_and_verify_encoding_type(clp::ReaderInterface& reader) -> void { reader.seek_from_begin(0); diff --git a/src/clp_ffi_js/ir/decoding_methods.hpp b/src/clp_ffi_js/ir/decoding_methods.hpp index 9e24280d..6791fd21 100644 --- a/src/clp_ffi_js/ir/decoding_methods.hpp +++ b/src/clp_ffi_js/ir/decoding_methods.hpp @@ -1,9 +1,12 @@ #ifndef CLP_FFI_JS_IR_DECODING_METHODS_HPP #define CLP_FFI_JS_IR_DECODING_METHODS_HPP +#include + #include namespace clp_ffi_js::ir { +auto get_version(clp::ReaderInterface& reader) -> std::string; auto rewind_reader_and_verify_encoding_type(clp::ReaderInterface& reader) -> void; } // namespace clp_ffi_js::ir From eba1d30c7a3e7e73f7668f59a5a82ac99b945bb7 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Mon, 14 Oct 2024 21:12:50 -0400 Subject: [PATCH 4/8] Move `#include ` from StreamReader.hpp to StreamReader.cpp. --- src/clp_ffi_js/ir/StreamReader.cpp | 1 + src/clp_ffi_js/ir/StreamReader.hpp | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index a4a01f20..86224f88 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index 8cc84469..a370bbc6 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -10,7 +10,6 @@ #include #include -#include #include #include From 5398d1d403b84cb5af579b289d8cfb68adb688d4 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Mon, 14 Oct 2024 21:16:08 -0400 Subject: [PATCH 5/8] Rename StreamReader -> IRStreamReader. --- CMakeLists.txt | 2 +- .../{StreamReader.cpp => IRStreamReader.cpp} | 36 +++++++++---------- .../{StreamReader.hpp => IRStreamReader.hpp} | 24 ++++++------- src/clp_ffi_js/ir/StreamReaderDataContext.hpp | 2 +- 4 files changed, 32 insertions(+), 32 deletions(-) rename src/clp_ffi_js/ir/{StreamReader.cpp => IRStreamReader.cpp} (86%) rename src/clp_ffi_js/ir/{StreamReader.hpp => IRStreamReader.hpp} (87%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 80568fe2..86473c50 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -114,7 +114,7 @@ target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/) set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/decoding_methods.cpp - src/clp_ffi_js/ir/StreamReader.cpp + src/clp_ffi_js/ir/IRStreamReader.cpp ) set(CLP_FFI_JS_SRC_CLP_CORE diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/IRStreamReader.cpp similarity index 86% rename from src/clp_ffi_js/ir/StreamReader.cpp rename to src/clp_ffi_js/ir/IRStreamReader.cpp index 86224f88..d6c561f7 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/IRStreamReader.cpp @@ -1,4 +1,4 @@ -#include "StreamReader.hpp" +#include "IRStreamReader.hpp" #include #include @@ -35,9 +35,9 @@ using namespace std::literals::string_literals; using clp::ir::four_byte_encoded_variable_t; namespace clp_ffi_js::ir { -auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader { +auto IRStreamReader::create(DataArrayTsType const& data_array) -> IRStreamReader { auto const length{data_array["length"].as()}; - SPDLOG_INFO("StreamReader::create: got buffer of length={}", length); + SPDLOG_INFO("IRStreamReader::create: got buffer of length={}", length); // Copy array from JavaScript to C++ clp::Array data_buffer{length}; @@ -53,14 +53,14 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader { std::move(zstd_decompressor), std::move(data_buffer) )}; - return StreamReader{std::move(stream_reader_data_context)}; + return IRStreamReader{std::move(stream_reader_data_context)}; } -auto StreamReader::get_num_events_buffered() const -> size_t { +auto IRStreamReader::get_num_events_buffered() const -> size_t { return m_encoded_log_events.size(); } -auto StreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { +auto IRStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { if (false == m_filtered_log_event_map.has_value()) { return FilteredLogEventMapTsType{emscripten::val::null()}; } @@ -68,7 +68,7 @@ auto StreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsTy return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; } -void StreamReader::filter_log_events(emscripten::val const& log_level_filter) { +void IRStreamReader::filter_log_events(emscripten::val const& log_level_filter) { if (log_level_filter.isNull()) { m_filtered_log_event_map.reset(); return; @@ -90,7 +90,7 @@ void StreamReader::filter_log_events(emscripten::val const& log_level_filter) { } } -auto StreamReader::deserialize_stream() -> size_t { +auto IRStreamReader::deserialize_stream() -> size_t { if (nullptr == m_stream_reader_data_context) { return m_encoded_log_events.size(); } @@ -150,7 +150,7 @@ auto StreamReader::deserialize_stream() -> size_t { return m_encoded_log_events.size(); } -auto StreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const +auto IRStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType { if (use_filter && false == m_filtered_log_event_map.has_value()) { return DecodedResultsTsType{emscripten::val::null()}; @@ -202,7 +202,7 @@ auto StreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filte return DecodedResultsTsType(results); } -StreamReader::StreamReader( +IRStreamReader::IRStreamReader( StreamReaderDataContext&& stream_reader_data_context ) : m_stream_reader_data_context{std::make_unique< @@ -211,7 +211,7 @@ StreamReader::StreamReader( )}, m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} -auto StreamReader::create_deserializer_and_data_context( +auto IRStreamReader::create_deserializer_and_data_context( std::unique_ptr&& zstd_decompressor, clp::Array&& data_buffer ) -> StreamReaderDataContext { @@ -247,21 +247,21 @@ EMSCRIPTEN_BINDINGS(ClpIrStreamReader) { ); emscripten::register_type("number[] | null"); - emscripten::class_("ClpIrStreamReader") + emscripten::class_("ClpIrStreamReader") .constructor( - &clp_ffi_js::ir::StreamReader::create, + &clp_ffi_js::ir::IRStreamReader::create, emscripten::return_value_policy::take_ownership() ) .function( "getNumEventsBuffered", - &clp_ffi_js::ir::StreamReader::get_num_events_buffered + &clp_ffi_js::ir::IRStreamReader::get_num_events_buffered ) .function( "getFilteredLogEventMap", - &clp_ffi_js::ir::StreamReader::get_filtered_log_event_map + &clp_ffi_js::ir::IRStreamReader::get_filtered_log_event_map ) - .function("filterLogEvents", &clp_ffi_js::ir::StreamReader::filter_log_events) - .function("deserializeStream", &clp_ffi_js::ir::StreamReader::deserialize_stream) - .function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range); + .function("filterLogEvents", &clp_ffi_js::ir::IRStreamReader::filter_log_events) + .function("deserializeStream", &clp_ffi_js::ir::IRStreamReader::deserialize_stream) + .function("decodeRange", &clp_ffi_js::ir::IRStreamReader::decode_range); } } // namespace diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/IRStreamReader.hpp similarity index 87% rename from src/clp_ffi_js/ir/StreamReader.hpp rename to src/clp_ffi_js/ir/IRStreamReader.hpp index a370bbc6..db267afa 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/IRStreamReader.hpp @@ -1,5 +1,5 @@ -#ifndef CLP_FFI_JS_IR_STREAM_READER_HPP -#define CLP_FFI_JS_IR_STREAM_READER_HPP +#ifndef CLP_FFI_JS_IR_IR_STREAM_READER_HPP +#define CLP_FFI_JS_IR_IR_STREAM_READER_HPP #include #include @@ -26,7 +26,7 @@ EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType); * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded * log events. */ -class StreamReader { +class IRStreamReader { public: /** * Mapping between an index in the filtered log events collection to an index in the unfiltered @@ -35,25 +35,25 @@ class StreamReader { using FilteredLogEventsMap = std::optional>; /** - * Creates a StreamReader to read from the given array. + * Creates a IRStreamReader to read from the given array. * * @param data_array An array containing a Zstandard-compressed IR stream. * @return The created instance. * @throw ClpFfiJsException if any error occurs. */ - [[nodiscard]] static auto create(DataArrayTsType const& data_array) -> StreamReader; + [[nodiscard]] static auto create(DataArrayTsType const& data_array) -> IRStreamReader; // Destructor - ~StreamReader() = default; + ~IRStreamReader() = default; // Disable copy constructor and assignment operator - StreamReader(StreamReader const&) = delete; - auto operator=(StreamReader const&) -> StreamReader& = delete; + IRStreamReader(IRStreamReader const&) = delete; + auto operator=(IRStreamReader const&) -> IRStreamReader& = delete; // Define default move constructor - StreamReader(StreamReader&&) = default; + IRStreamReader(IRStreamReader&&) = default; // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. - auto operator=(StreamReader&&) -> StreamReader& = delete; + auto operator=(IRStreamReader&&) -> IRStreamReader& = delete; // Methods /** @@ -101,7 +101,7 @@ class StreamReader { private: // Constructor - explicit StreamReader( + explicit IRStreamReader( StreamReaderDataContext&& stream_reader_data_context ); @@ -120,4 +120,4 @@ class StreamReader { }; } // namespace clp_ffi_js::ir -#endif // CLP_FFI_JS_IR_STREAM_READER_HPP +#endif // CLP_FFI_JS_IR_IR_STREAM_READER_HPP diff --git a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp index 091b0b05..c6f8134c 100644 --- a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp +++ b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp @@ -11,7 +11,7 @@ namespace clp_ffi_js::ir { /** - * The data context for a `StreamReader`. It encapsulates a chain of the following resources: + * The data context for a `IRStreamReader`. It encapsulates a chain of the following resources: * A `clp::ir::LogEventDeserializer` that reads from a * `clp::streaming_compression::zstd::Decompressor`, which in turn reads from a `clp::Array`. * @tparam encoded_variable_t Type of encoded variables encoded in the stream. From cfd55ca2af9b456eeacef1acfffabe93722e5134 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Mon, 14 Oct 2024 21:18:18 -0400 Subject: [PATCH 6/8] Rename ClpIrStreamReader to ClpIRStreamReader in bindings. --- src/clp_ffi_js/ir/IRStreamReader.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/clp_ffi_js/ir/IRStreamReader.cpp b/src/clp_ffi_js/ir/IRStreamReader.cpp index d6c561f7..e48b7310 100644 --- a/src/clp_ffi_js/ir/IRStreamReader.cpp +++ b/src/clp_ffi_js/ir/IRStreamReader.cpp @@ -240,14 +240,14 @@ auto IRStreamReader::create_deserializer_and_data_context( } // namespace clp_ffi_js::ir namespace { -EMSCRIPTEN_BINDINGS(ClpIrStreamReader) { +EMSCRIPTEN_BINDINGS(ClpIRStreamReader) { emscripten::register_type("Uint8Array"); emscripten::register_type( "Array<[string, number, number, number]>" ); emscripten::register_type("number[] | null"); - emscripten::class_("ClpIrStreamReader") + emscripten::class_("ClpIRStreamReader") .constructor( &clp_ffi_js::ir::IRStreamReader::create, emscripten::return_value_policy::take_ownership() From d1ed2f02fc739e065d5ff8fcd1de3994dba3f01b Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Mon, 14 Oct 2024 21:24:07 -0400 Subject: [PATCH 7/8] Extract a StreamReader base class from IRStreamReader. --- CMakeLists.txt | 1 + src/clp_ffi_js/ir/IRStreamReader.cpp | 11 ++-- src/clp_ffi_js/ir/IRStreamReader.hpp | 23 ++++--- src/clp_ffi_js/ir/StreamReader.cpp | 70 ++++++++++++++++++++++ src/clp_ffi_js/ir/StreamReader.hpp | 89 ++++++++++++++++++++++++++++ 5 files changed, 175 insertions(+), 19 deletions(-) create mode 100644 src/clp_ffi_js/ir/StreamReader.cpp create mode 100644 src/clp_ffi_js/ir/StreamReader.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 86473c50..4f3b8c6f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -115,6 +115,7 @@ target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/) set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/decoding_methods.cpp src/clp_ffi_js/ir/IRStreamReader.cpp + src/clp_ffi_js/ir/StreamReader.cpp ) set(CLP_FFI_JS_SRC_CLP_CORE diff --git a/src/clp_ffi_js/ir/IRStreamReader.cpp b/src/clp_ffi_js/ir/IRStreamReader.cpp index e48b7310..ecab8a79 100644 --- a/src/clp_ffi_js/ir/IRStreamReader.cpp +++ b/src/clp_ffi_js/ir/IRStreamReader.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include using namespace std::literals::string_literals; @@ -241,13 +242,9 @@ auto IRStreamReader::create_deserializer_and_data_context( namespace { EMSCRIPTEN_BINDINGS(ClpIRStreamReader) { - emscripten::register_type("Uint8Array"); - emscripten::register_type( - "Array<[string, number, number, number]>" - ); - emscripten::register_type("number[] | null"); - - emscripten::class_("ClpIRStreamReader") + emscripten::class_< + clp_ffi_js::ir::IRStreamReader, + emscripten::base>("ClpIRStreamReader") .constructor( &clp_ffi_js::ir::IRStreamReader::create, emscripten::return_value_policy::take_ownership() diff --git a/src/clp_ffi_js/ir/IRStreamReader.hpp b/src/clp_ffi_js/ir/IRStreamReader.hpp index db267afa..137a2996 100644 --- a/src/clp_ffi_js/ir/IRStreamReader.hpp +++ b/src/clp_ffi_js/ir/IRStreamReader.hpp @@ -13,20 +13,19 @@ #include #include +#include #include using clp::ir::four_byte_encoded_variable_t; namespace clp_ffi_js::ir { -EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType); -EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); -EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType); - /** * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded * log events. */ -class IRStreamReader { +class IRStreamReader : public StreamReader { + friend StreamReader; + public: /** * Mapping between an index in the filtered log events collection to an index in the unfiltered @@ -44,7 +43,7 @@ class IRStreamReader { [[nodiscard]] static auto create(DataArrayTsType const& data_array) -> IRStreamReader; // Destructor - ~IRStreamReader() = default; + ~IRStreamReader() override = default; // Disable copy constructor and assignment operator IRStreamReader(IRStreamReader const&) = delete; @@ -59,19 +58,19 @@ class IRStreamReader { /** * @return The number of events buffered. */ - [[nodiscard]] auto get_num_events_buffered() const -> size_t; + [[nodiscard]] auto get_num_events_buffered() const -> size_t override; /** * @return The filtered log events map. */ - [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType; + [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override; /** * Generates a filtered collection from all log events. * * @param log_level_filter Array of selected log levels */ - void filter_log_events(emscripten::val const& log_level_filter); + void filter_log_events(emscripten::val const& log_level_filter) override; /** * Deserializes all log events in the stream. After the stream has been exhausted, it will be @@ -79,7 +78,7 @@ class IRStreamReader { * * @return The number of successfully deserialized ("valid") log events. */ - [[nodiscard]] auto deserialize_stream() -> size_t; + [[nodiscard]] auto deserialize_stream() -> size_t override; /** * Decodes log events in the range `[beginIdx, endIdx)` of the filtered or unfiltered @@ -96,8 +95,8 @@ class IRStreamReader { * @return null if any log event in the range doesn't exist (e.g. the range exceeds the number * of log events in the collection). */ - [[nodiscard]] auto - decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType; + [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType override; private: // Constructor diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp new file mode 100644 index 00000000..6906d7e0 --- /dev/null +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -0,0 +1,70 @@ +#include "StreamReader.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace clp_ffi_js::ir { +auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr { + auto const length{data_array["length"].as()}; + SPDLOG_INFO("KVPairIRStreamReader::create: got buffer of length={}", length); + + // Copy array from JavaScript to C++ + clp::Array data_buffer{length}; + // NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast) + emscripten::val::module_property("HEAPU8") + .call("set", data_array, reinterpret_cast(data_buffer.data())); + // NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast) + + auto zstd_decompressor{std::make_unique()}; + zstd_decompressor->open(data_buffer.data(), length); + + auto const version{get_version(*zstd_decompressor)}; + if (version == "v0.0.0") { + auto stream_reader_data_context{IRStreamReader::create_deserializer_and_data_context( + std::move(zstd_decompressor), + std::move(data_buffer) + )}; + + return std::unique_ptr( + new IRStreamReader(std::move(stream_reader_data_context)) + ); + } + + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Unsupported, + __FILENAME__, + __LINE__, + std::format("Unable to create stream reader for IR data with version {}.", version) + }; +} +} // namespace clp_ffi_js::ir + +namespace { +EMSCRIPTEN_BINDINGS(ClpStreamReader) { + emscripten::register_type("Uint8Array"); + emscripten::register_type( + "Array<[string, number, number, number]>" + ); + emscripten::register_type("number[] | null"); + + emscripten::class_("ClpStreamReader") + .constructor( + &clp_ffi_js::ir::StreamReader::create, + emscripten::return_value_policy::take_ownership() + ); +} +} // namespace diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp new file mode 100644 index 00000000..ad9cbaa0 --- /dev/null +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -0,0 +1,89 @@ +#ifndef CLP_FFI_JS_IR_STREAM_READER_HPP +#define CLP_FFI_JS_IR_STREAM_READER_HPP + +#include +#include + +#include + +namespace clp_ffi_js::ir { +EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType); +EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); +EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType); + +/** + * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded + * log events. + */ +class StreamReader { +public: + /** + * Creates a StreamReader to read from the given array. + * + * @param data_array An array containing a Zstandard-compressed IR stream. + * @return The created instance. + * @throw ClpFfiJsException if any error occurs. + */ + [[nodiscard]] static auto create(DataArrayTsType const& data_array + ) -> std::unique_ptr; + + // Destructor + virtual ~StreamReader() = default; + + // Disable copy constructor and assignment operator + StreamReader(StreamReader const&) = delete; + auto operator=(StreamReader const&) -> StreamReader& = delete; + + // Define default move constructor + StreamReader(StreamReader&&) = default; + // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. + auto operator=(StreamReader&&) -> StreamReader& = delete; + + /** + * @return The number of events buffered. + */ + [[nodiscard]] virtual auto get_num_events_buffered() const -> size_t = 0; + + /** + * @return The filtered log events map. + */ + [[nodiscard]] virtual auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType = 0; + + /** + * Generates a filtered collection from all log events. + * + * @param log_level_filter Array of selected log levels + */ + virtual void filter_log_events(emscripten::val const& log_level_filter) = 0; + + /** + * Deserializes all log events in the stream. After the stream has been exhausted, it will be + * deallocated. + * + * @return The number of successfully deserialized ("valid") log events. + */ + [[nodiscard]] virtual auto deserialize_stream() -> size_t = 0; + + /** + * Decodes log events in the range `[beginIdx, endIdx)` of the filtered or unfiltered + * (depending on the value of `useFilter`) log events collection. + * + * @param begin_idx + * @param end_idx + * @param use_filter Whether to decode from the filtered or unfiltered log events collection. + * @return An array where each element is a decoded log event represented by an array of: + * - The log event's message + * - The log event's timestamp as milliseconds since the Unix epoch + * - The log event's log level as an integer that indexes into `cLogLevelNames` + * - The log event's number (1-indexed) in the stream + * @return null if any log event in the range doesn't exist (e.g. the range exceeds the number + * of log events in the collection). + */ + [[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType = 0; + +protected: + explicit StreamReader() = default; +}; +} // namespace clp_ffi_js::ir +#endif // CLP_FFI_JS_IR_STREAM_READER_HPP From bf5e4c9563bc6a2c25de5514161b997ab396b1a9 Mon Sep 17 00:00:00 2001 From: Junhao Liao Date: Mon, 14 Oct 2024 21:37:44 -0400 Subject: [PATCH 8/8] Rename IRStreamReader -> IrStreamReader. --- CMakeLists.txt | 2 +- ...{IRStreamReader.cpp => IrStreamReader.cpp} | 40 +++++++++---------- ...{IRStreamReader.hpp => IrStreamReader.hpp} | 18 ++++----- src/clp_ffi_js/ir/StreamReader.cpp | 8 ++-- src/clp_ffi_js/ir/StreamReaderDataContext.hpp | 2 +- 5 files changed, 35 insertions(+), 35 deletions(-) rename src/clp_ffi_js/ir/{IRStreamReader.cpp => IrStreamReader.cpp} (88%) rename src/clp_ffi_js/ir/{IRStreamReader.hpp => IrStreamReader.hpp} (90%) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4f3b8c6f..2d584386 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -114,7 +114,7 @@ target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/) set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/decoding_methods.cpp - src/clp_ffi_js/ir/IRStreamReader.cpp + src/clp_ffi_js/ir/IrStreamReader.cpp src/clp_ffi_js/ir/StreamReader.cpp ) diff --git a/src/clp_ffi_js/ir/IRStreamReader.cpp b/src/clp_ffi_js/ir/IrStreamReader.cpp similarity index 88% rename from src/clp_ffi_js/ir/IRStreamReader.cpp rename to src/clp_ffi_js/ir/IrStreamReader.cpp index ecab8a79..8ec58c2e 100644 --- a/src/clp_ffi_js/ir/IRStreamReader.cpp +++ b/src/clp_ffi_js/ir/IrStreamReader.cpp @@ -1,4 +1,4 @@ -#include "IRStreamReader.hpp" +#include "IrStreamReader.hpp" #include #include @@ -36,9 +36,9 @@ using namespace std::literals::string_literals; using clp::ir::four_byte_encoded_variable_t; namespace clp_ffi_js::ir { -auto IRStreamReader::create(DataArrayTsType const& data_array) -> IRStreamReader { +auto IrStreamReader::create(DataArrayTsType const& data_array) -> IrStreamReader { auto const length{data_array["length"].as()}; - SPDLOG_INFO("IRStreamReader::create: got buffer of length={}", length); + SPDLOG_INFO("IrStreamReader::create: got buffer of length={}", length); // Copy array from JavaScript to C++ clp::Array data_buffer{length}; @@ -54,14 +54,14 @@ auto IRStreamReader::create(DataArrayTsType const& data_array) -> IRStreamReader std::move(zstd_decompressor), std::move(data_buffer) )}; - return IRStreamReader{std::move(stream_reader_data_context)}; + return IrStreamReader{std::move(stream_reader_data_context)}; } -auto IRStreamReader::get_num_events_buffered() const -> size_t { +auto IrStreamReader::get_num_events_buffered() const -> size_t { return m_encoded_log_events.size(); } -auto IRStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { +auto IrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { if (false == m_filtered_log_event_map.has_value()) { return FilteredLogEventMapTsType{emscripten::val::null()}; } @@ -69,7 +69,7 @@ auto IRStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTs return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; } -void IRStreamReader::filter_log_events(emscripten::val const& log_level_filter) { +void IrStreamReader::filter_log_events(emscripten::val const& log_level_filter) { if (log_level_filter.isNull()) { m_filtered_log_event_map.reset(); return; @@ -91,7 +91,7 @@ void IRStreamReader::filter_log_events(emscripten::val const& log_level_filter) } } -auto IRStreamReader::deserialize_stream() -> size_t { +auto IrStreamReader::deserialize_stream() -> size_t { if (nullptr == m_stream_reader_data_context) { return m_encoded_log_events.size(); } @@ -151,7 +151,7 @@ auto IRStreamReader::deserialize_stream() -> size_t { return m_encoded_log_events.size(); } -auto IRStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const +auto IrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType { if (use_filter && false == m_filtered_log_event_map.has_value()) { return DecodedResultsTsType{emscripten::val::null()}; @@ -203,7 +203,7 @@ auto IRStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_fil return DecodedResultsTsType(results); } -IRStreamReader::IRStreamReader( +IrStreamReader::IrStreamReader( StreamReaderDataContext&& stream_reader_data_context ) : m_stream_reader_data_context{std::make_unique< @@ -212,7 +212,7 @@ IRStreamReader::IRStreamReader( )}, m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} -auto IRStreamReader::create_deserializer_and_data_context( +auto IrStreamReader::create_deserializer_and_data_context( std::unique_ptr&& zstd_decompressor, clp::Array&& data_buffer ) -> StreamReaderDataContext { @@ -241,24 +241,24 @@ auto IRStreamReader::create_deserializer_and_data_context( } // namespace clp_ffi_js::ir namespace { -EMSCRIPTEN_BINDINGS(ClpIRStreamReader) { +EMSCRIPTEN_BINDINGS(ClpIrStreamReader) { emscripten::class_< - clp_ffi_js::ir::IRStreamReader, - emscripten::base>("ClpIRStreamReader") + clp_ffi_js::ir::IrStreamReader, + emscripten::base>("ClpIrStreamReader") .constructor( - &clp_ffi_js::ir::IRStreamReader::create, + &clp_ffi_js::ir::IrStreamReader::create, emscripten::return_value_policy::take_ownership() ) .function( "getNumEventsBuffered", - &clp_ffi_js::ir::IRStreamReader::get_num_events_buffered + &clp_ffi_js::ir::IrStreamReader::get_num_events_buffered ) .function( "getFilteredLogEventMap", - &clp_ffi_js::ir::IRStreamReader::get_filtered_log_event_map + &clp_ffi_js::ir::IrStreamReader::get_filtered_log_event_map ) - .function("filterLogEvents", &clp_ffi_js::ir::IRStreamReader::filter_log_events) - .function("deserializeStream", &clp_ffi_js::ir::IRStreamReader::deserialize_stream) - .function("decodeRange", &clp_ffi_js::ir::IRStreamReader::decode_range); + .function("filterLogEvents", &clp_ffi_js::ir::IrStreamReader::filter_log_events) + .function("deserializeStream", &clp_ffi_js::ir::IrStreamReader::deserialize_stream) + .function("decodeRange", &clp_ffi_js::ir::IrStreamReader::decode_range); } } // namespace diff --git a/src/clp_ffi_js/ir/IRStreamReader.hpp b/src/clp_ffi_js/ir/IrStreamReader.hpp similarity index 90% rename from src/clp_ffi_js/ir/IRStreamReader.hpp rename to src/clp_ffi_js/ir/IrStreamReader.hpp index 137a2996..768742c6 100644 --- a/src/clp_ffi_js/ir/IRStreamReader.hpp +++ b/src/clp_ffi_js/ir/IrStreamReader.hpp @@ -23,7 +23,7 @@ namespace clp_ffi_js::ir { * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded * log events. */ -class IRStreamReader : public StreamReader { +class IrStreamReader : public StreamReader { friend StreamReader; public: @@ -34,25 +34,25 @@ class IRStreamReader : public StreamReader { using FilteredLogEventsMap = std::optional>; /** - * Creates a IRStreamReader to read from the given array. + * Creates a IrStreamReader to read from the given array. * * @param data_array An array containing a Zstandard-compressed IR stream. * @return The created instance. * @throw ClpFfiJsException if any error occurs. */ - [[nodiscard]] static auto create(DataArrayTsType const& data_array) -> IRStreamReader; + [[nodiscard]] static auto create(DataArrayTsType const& data_array) -> IrStreamReader; // Destructor - ~IRStreamReader() override = default; + ~IrStreamReader() override = default; // Disable copy constructor and assignment operator - IRStreamReader(IRStreamReader const&) = delete; - auto operator=(IRStreamReader const&) -> IRStreamReader& = delete; + IrStreamReader(IrStreamReader const&) = delete; + auto operator=(IrStreamReader const&) -> IrStreamReader& = delete; // Define default move constructor - IRStreamReader(IRStreamReader&&) = default; + IrStreamReader(IrStreamReader&&) = default; // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. - auto operator=(IRStreamReader&&) -> IRStreamReader& = delete; + auto operator=(IrStreamReader&&) -> IrStreamReader& = delete; // Methods /** @@ -100,7 +100,7 @@ class IRStreamReader : public StreamReader { private: // Constructor - explicit IRStreamReader( + explicit IrStreamReader( StreamReaderDataContext&& stream_reader_data_context ); diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index 6906d7e0..176af4d3 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -15,7 +15,7 @@ #include #include -#include +#include namespace clp_ffi_js::ir { auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr { @@ -34,13 +34,13 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr< auto const version{get_version(*zstd_decompressor)}; if (version == "v0.0.0") { - auto stream_reader_data_context{IRStreamReader::create_deserializer_and_data_context( + auto stream_reader_data_context{IrStreamReader::create_deserializer_and_data_context( std::move(zstd_decompressor), std::move(data_buffer) )}; - return std::unique_ptr( - new IRStreamReader(std::move(stream_reader_data_context)) + return std::unique_ptr( + new IrStreamReader(std::move(stream_reader_data_context)) ); } diff --git a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp index c6f8134c..651f741f 100644 --- a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp +++ b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp @@ -11,7 +11,7 @@ namespace clp_ffi_js::ir { /** - * The data context for a `IRStreamReader`. It encapsulates a chain of the following resources: + * The data context for a `IrStreamReader`. It encapsulates a chain of the following resources: * A `clp::ir::LogEventDeserializer` that reads from a * `clp::streaming_compression::zstd::Decompressor`, which in turn reads from a `clp::Array`. * @tparam encoded_variable_t Type of encoded variables encoded in the stream.