diff --git a/CMakeLists.txt b/CMakeLists.txt index 753dee44..2d584386 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,7 +112,11 @@ target_include_directories( target_include_directories(${CLP_FFI_JS_BIN_NAME} PRIVATE src/) -set(CLP_FFI_JS_SRC_MAIN src/clp_ffi_js/ir/StreamReader.cpp) +set(CLP_FFI_JS_SRC_MAIN + src/clp_ffi_js/ir/decoding_methods.cpp + src/clp_ffi_js/ir/IrStreamReader.cpp + src/clp_ffi_js/ir/StreamReader.cpp +) set(CLP_FFI_JS_SRC_CLP_CORE src/submodules/clp/components/core/src/clp/ffi/ir_stream/decoding_methods.cpp diff --git a/src/clp_ffi_js/ir/IrStreamReader.cpp b/src/clp_ffi_js/ir/IrStreamReader.cpp new file mode 100644 index 00000000..8ec58c2e --- /dev/null +++ b/src/clp_ffi_js/ir/IrStreamReader.cpp @@ -0,0 +1,264 @@ +#include "IrStreamReader.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace std::literals::string_literals; +using clp::ir::four_byte_encoded_variable_t; + +namespace clp_ffi_js::ir { +auto IrStreamReader::create(DataArrayTsType const& data_array) -> IrStreamReader { + auto const length{data_array["length"].as()}; + SPDLOG_INFO("IrStreamReader::create: got buffer of length={}", length); + + // Copy array from JavaScript to C++ + clp::Array data_buffer{length}; + // NOLINTBEGIN(cppcoreguidelines-pro-type-reinterpret-cast) + emscripten::val::module_property("HEAPU8") + .call("set", data_array, reinterpret_cast(data_buffer.data())); + // NOLINTEND(cppcoreguidelines-pro-type-reinterpret-cast) + + auto zstd_decompressor{std::make_unique()}; + zstd_decompressor->open(data_buffer.data(), length); + + auto stream_reader_data_context{create_deserializer_and_data_context( + std::move(zstd_decompressor), + std::move(data_buffer) + )}; + return IrStreamReader{std::move(stream_reader_data_context)}; +} + +auto IrStreamReader::get_num_events_buffered() const -> size_t { + return m_encoded_log_events.size(); +} + +auto IrStreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { + if (false == m_filtered_log_event_map.has_value()) { + return FilteredLogEventMapTsType{emscripten::val::null()}; + } + + return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; +} + +void IrStreamReader::filter_log_events(emscripten::val const& log_level_filter) { + if (log_level_filter.isNull()) { + m_filtered_log_event_map.reset(); + return; + } + + m_filtered_log_event_map.emplace(); + auto filter_levels{emscripten::vecFromJSArray>(log_level_filter + )}; + for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) { + auto const& log_event = m_encoded_log_events[log_event_idx]; + if (std::ranges::find( + filter_levels, + clp::enum_to_underlying_type(log_event.get_log_level()) + ) + != filter_levels.end()) + { + m_filtered_log_event_map->emplace_back(log_event_idx); + } + } +} + +auto IrStreamReader::deserialize_stream() -> size_t { + if (nullptr == m_stream_reader_data_context) { + return m_encoded_log_events.size(); + } + + constexpr size_t cDefaultNumReservedLogEvents{500'000}; + m_encoded_log_events.reserve(cDefaultNumReservedLogEvents); + + while (true) { + auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()}; + if (result.has_error()) { + auto const error{result.error()}; + if (std::errc::no_message_available == error) { + break; + } + if (std::errc::result_out_of_range == error) { + SPDLOG_ERROR("File contains an incomplete IR stream"); + break; + } + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Corrupt, + __FILENAME__, + __LINE__, + "Failed to deserialize: "s + error.category().name() + ":" + error.message() + }; + } + auto const& log_event = result.value(); + auto const& message = log_event.get_message(); + + auto const& logtype = message.get_logtype(); + constexpr size_t cLogLevelPositionInMessages{1}; + LogLevel log_level{LogLevel::NONE}; + if (logtype.length() > cLogLevelPositionInMessages) { + // NOLINTNEXTLINE(readability-qualified-auto) + auto const log_level_name_it{std::find_if( + cLogLevelNames.begin() + static_cast(cValidLogLevelsBeginIdx), + cLogLevelNames.end(), + [&](std::string_view level) { + return logtype.substr(cLogLevelPositionInMessages).starts_with(level); + } + )}; + if (log_level_name_it != cLogLevelNames.end()) { + log_level = static_cast( + std::distance(cLogLevelNames.begin(), log_level_name_it) + ); + } + } + + auto log_viewer_event{LogEventWithLevel( + log_event.get_timestamp(), + log_event.get_utc_offset(), + message, + log_level + )}; + m_encoded_log_events.emplace_back(std::move(log_viewer_event)); + } + m_stream_reader_data_context.reset(nullptr); + return m_encoded_log_events.size(); +} + +auto IrStreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType { + if (use_filter && false == m_filtered_log_event_map.has_value()) { + return DecodedResultsTsType{emscripten::val::null()}; + } + + size_t length{0}; + if (use_filter) { + length = m_filtered_log_event_map->size(); + } else { + length = m_encoded_log_events.size(); + } + if (length < end_idx || begin_idx > end_idx) { + return DecodedResultsTsType{emscripten::val::null()}; + } + + std::string message; + constexpr size_t cDefaultReservedMessageLength{512}; + message.reserve(cDefaultReservedMessageLength); + auto const results{emscripten::val::array()}; + + for (size_t i = begin_idx; i < end_idx; ++i) { + size_t log_event_idx{0}; + if (use_filter) { + log_event_idx = m_filtered_log_event_map->at(i); + } else { + log_event_idx = i; + } + auto const& log_event{m_encoded_log_events[log_event_idx]}; + + auto const parsed{log_event.get_message().decode_and_unparse()}; + if (false == parsed.has_value()) { + SPDLOG_ERROR("Failed to decode message."); + break; + } + message = parsed.value(); + + m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message); + + EM_ASM( + { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, + results.as_handle(), + message.c_str(), + log_event.get_timestamp(), + log_event.get_log_level(), + log_event_idx + 1 + ); + } + + return DecodedResultsTsType(results); +} + +IrStreamReader::IrStreamReader( + StreamReaderDataContext&& stream_reader_data_context +) + : m_stream_reader_data_context{std::make_unique< + StreamReaderDataContext>( + std::move(stream_reader_data_context) + )}, + m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} + +auto IrStreamReader::create_deserializer_and_data_context( + std::unique_ptr&& zstd_decompressor, + clp::Array&& data_buffer +) -> StreamReaderDataContext { + rewind_reader_and_verify_encoding_type(*zstd_decompressor); + + auto result{ + clp::ir::LogEventDeserializer::create(*zstd_decompressor) + }; + if (result.has_error()) { + auto const error_code{result.error()}; + SPDLOG_CRITICAL( + "Failed to create deserializer: {}:{}", + error_code.category().name(), + error_code.message() + ); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + "Failed to create deserializer" + }; + } + + return {std::move(data_buffer), std::move(zstd_decompressor), std::move(result.value())}; +} +} // namespace clp_ffi_js::ir + +namespace { +EMSCRIPTEN_BINDINGS(ClpIrStreamReader) { + emscripten::class_< + clp_ffi_js::ir::IrStreamReader, + emscripten::base>("ClpIrStreamReader") + .constructor( + &clp_ffi_js::ir::IrStreamReader::create, + emscripten::return_value_policy::take_ownership() + ) + .function( + "getNumEventsBuffered", + &clp_ffi_js::ir::IrStreamReader::get_num_events_buffered + ) + .function( + "getFilteredLogEventMap", + &clp_ffi_js::ir::IrStreamReader::get_filtered_log_event_map + ) + .function("filterLogEvents", &clp_ffi_js::ir::IrStreamReader::filter_log_events) + .function("deserializeStream", &clp_ffi_js::ir::IrStreamReader::deserialize_stream) + .function("decodeRange", &clp_ffi_js::ir::IrStreamReader::decode_range); +} +} // namespace diff --git a/src/clp_ffi_js/ir/IrStreamReader.hpp b/src/clp_ffi_js/ir/IrStreamReader.hpp new file mode 100644 index 00000000..768742c6 --- /dev/null +++ b/src/clp_ffi_js/ir/IrStreamReader.hpp @@ -0,0 +1,122 @@ +#ifndef CLP_FFI_JS_IR_IR_STREAM_READER_HPP +#define CLP_FFI_JS_IR_IR_STREAM_READER_HPP + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include + +using clp::ir::four_byte_encoded_variable_t; + +namespace clp_ffi_js::ir { +/** + * Class to deserialize and decode Zstandard-compressed CLP IR streams as well as format decoded + * log events. + */ +class IrStreamReader : public StreamReader { + friend StreamReader; + +public: + /** + * Mapping between an index in the filtered log events collection to an index in the unfiltered + * log events collection. + */ + using FilteredLogEventsMap = std::optional>; + + /** + * Creates a IrStreamReader to read from the given array. + * + * @param data_array An array containing a Zstandard-compressed IR stream. + * @return The created instance. + * @throw ClpFfiJsException if any error occurs. + */ + [[nodiscard]] static auto create(DataArrayTsType const& data_array) -> IrStreamReader; + + // Destructor + ~IrStreamReader() override = default; + + // Disable copy constructor and assignment operator + IrStreamReader(IrStreamReader const&) = delete; + auto operator=(IrStreamReader const&) -> IrStreamReader& = delete; + + // Define default move constructor + IrStreamReader(IrStreamReader&&) = default; + // Delete move assignment operator since it's also disabled in `clp::ir::LogEventDeserializer`. + auto operator=(IrStreamReader&&) -> IrStreamReader& = delete; + + // Methods + /** + * @return The number of events buffered. + */ + [[nodiscard]] auto get_num_events_buffered() const -> size_t override; + + /** + * @return The filtered log events map. + */ + [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType override; + + /** + * Generates a filtered collection from all log events. + * + * @param log_level_filter Array of selected log levels + */ + void filter_log_events(emscripten::val const& log_level_filter) override; + + /** + * Deserializes all log events in the stream. After the stream has been exhausted, it will be + * deallocated. + * + * @return The number of successfully deserialized ("valid") log events. + */ + [[nodiscard]] auto deserialize_stream() -> size_t override; + + /** + * Decodes log events in the range `[beginIdx, endIdx)` of the filtered or unfiltered + * (depending on the value of `useFilter`) log events collection. + * + * @param begin_idx + * @param end_idx + * @param use_filter Whether to decode from the filtered or unfiltered log events collection. + * @return An array where each element is a decoded log event represented by an array of: + * - The log event's message + * - The log event's timestamp as milliseconds since the Unix epoch + * - The log event's log level as an integer that indexes into `cLogLevelNames` + * - The log event's number (1-indexed) in the stream + * @return null if any log event in the range doesn't exist (e.g. the range exceeds the number + * of log events in the collection). + */ + [[nodiscard]] auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType override; + +private: + // Constructor + explicit IrStreamReader( + StreamReaderDataContext&& stream_reader_data_context + ); + + // Methods + [[nodiscard]] static auto create_deserializer_and_data_context( + std::unique_ptr&& zstd_decompressor, + clp::Array&& data_buffer + ) -> StreamReaderDataContext; + + // Variables + std::vector> m_encoded_log_events; + std::unique_ptr> + m_stream_reader_data_context; + FilteredLogEventsMap m_filtered_log_event_map; + clp::TimestampPattern m_ts_pattern; +}; +} // namespace clp_ffi_js::ir + +#endif // CLP_FFI_JS_IR_IR_STREAM_READER_HPP diff --git a/src/clp_ffi_js/ir/StreamReader.cpp b/src/clp_ffi_js/ir/StreamReader.cpp index b9c86b6b..176af4d3 100644 --- a/src/clp_ffi_js/ir/StreamReader.cpp +++ b/src/clp_ffi_js/ir/StreamReader.cpp @@ -1,42 +1,26 @@ #include "StreamReader.hpp" -#include #include #include -#include +#include +#include #include -#include -#include -#include -#include -#include #include -#include #include -#include -#include -#include -#include #include #include -#include -#include -#include +#include #include #include -#include -#include -#include - -using namespace std::literals::string_literals; -using clp::ir::four_byte_encoded_variable_t; +#include +#include namespace clp_ffi_js::ir { -auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader { +auto StreamReader::create(DataArrayTsType const& data_array) -> std::unique_ptr { auto const length{data_array["length"].as()}; - SPDLOG_INFO("StreamReader::create: got buffer of length={}", length); + SPDLOG_INFO("KVPairIRStreamReader::create: got buffer of length={}", length); // Copy array from JavaScript to C++ clp::Array data_buffer{length}; @@ -48,234 +32,39 @@ auto StreamReader::create(DataArrayTsType const& data_array) -> StreamReader { auto zstd_decompressor{std::make_unique()}; zstd_decompressor->open(data_buffer.data(), length); - bool is_four_bytes_encoding{true}; - if (auto const err{ - clp::ffi::ir_stream::get_encoding_type(*zstd_decompressor, is_four_bytes_encoding) - }; - clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err) - { - SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err); - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_MetadataCorrupted, - __FILENAME__, - __LINE__, - "Failed to decode encoding type." - }; - } - if (false == is_four_bytes_encoding) { - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Unsupported, - __FILENAME__, - __LINE__, - "IR stream uses unsupported encoding." - }; - } - - auto result{ - clp::ir::LogEventDeserializer::create(*zstd_decompressor) - }; - if (result.has_error()) { - auto const error_code{result.error()}; - SPDLOG_CRITICAL( - "Failed to create deserializer: {}:{}", - error_code.category().name(), - error_code.message() - ); - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Failure, - __FILENAME__, - __LINE__, - "Failed to create deserializer" - }; - } - - StreamReaderDataContext stream_reader_data_context{ - std::move(data_buffer), - std::move(zstd_decompressor), - std::move(result.value()) - }; - return StreamReader{std::move(stream_reader_data_context)}; -} - -auto StreamReader::get_num_events_buffered() const -> size_t { - return m_encoded_log_events.size(); -} - -auto StreamReader::get_filtered_log_event_map() const -> FilteredLogEventMapTsType { - if (false == m_filtered_log_event_map.has_value()) { - return FilteredLogEventMapTsType{emscripten::val::null()}; - } - - return FilteredLogEventMapTsType{emscripten::val::array(m_filtered_log_event_map.value())}; -} - -void StreamReader::filter_log_events(emscripten::val const& log_level_filter) { - if (log_level_filter.isNull()) { - m_filtered_log_event_map.reset(); - return; - } - - m_filtered_log_event_map.emplace(); - auto filter_levels{emscripten::vecFromJSArray>(log_level_filter - )}; - for (size_t log_event_idx = 0; log_event_idx < m_encoded_log_events.size(); ++log_event_idx) { - auto const& log_event = m_encoded_log_events[log_event_idx]; - if (std::ranges::find( - filter_levels, - clp::enum_to_underlying_type(log_event.get_log_level()) - ) - != filter_levels.end()) - { - m_filtered_log_event_map->emplace_back(log_event_idx); - } - } -} - -auto StreamReader::deserialize_stream() -> size_t { - if (nullptr == m_stream_reader_data_context) { - return m_encoded_log_events.size(); - } - - constexpr size_t cDefaultNumReservedLogEvents{500'000}; - m_encoded_log_events.reserve(cDefaultNumReservedLogEvents); - - while (true) { - auto result{m_stream_reader_data_context->get_deserializer().deserialize_log_event()}; - if (result.has_error()) { - auto const error{result.error()}; - if (std::errc::no_message_available == error) { - break; - } - if (std::errc::result_out_of_range == error) { - SPDLOG_ERROR("File contains an incomplete IR stream"); - break; - } - throw ClpFfiJsException{ - clp::ErrorCode::ErrorCode_Corrupt, - __FILENAME__, - __LINE__, - "Failed to deserialize: "s + error.category().name() + ":" + error.message() - }; - } - auto const& log_event = result.value(); - auto const& message = log_event.get_message(); - - auto const& logtype = message.get_logtype(); - constexpr size_t cLogLevelPositionInMessages{1}; - LogLevel log_level{LogLevel::NONE}; - if (logtype.length() > cLogLevelPositionInMessages) { - // NOLINTNEXTLINE(readability-qualified-auto) - auto const log_level_name_it{std::find_if( - cLogLevelNames.begin() + static_cast(cValidLogLevelsBeginIdx), - cLogLevelNames.end(), - [&](std::string_view level) { - return logtype.substr(cLogLevelPositionInMessages).starts_with(level); - } - )}; - if (log_level_name_it != cLogLevelNames.end()) { - log_level = static_cast( - std::distance(cLogLevelNames.begin(), log_level_name_it) - ); - } - } - - auto log_viewer_event{LogEventWithLevel( - log_event.get_timestamp(), - log_event.get_utc_offset(), - message, - log_level + auto const version{get_version(*zstd_decompressor)}; + if (version == "v0.0.0") { + auto stream_reader_data_context{IrStreamReader::create_deserializer_and_data_context( + std::move(zstd_decompressor), + std::move(data_buffer) )}; - m_encoded_log_events.emplace_back(std::move(log_viewer_event)); - } - m_stream_reader_data_context.reset(nullptr); - return m_encoded_log_events.size(); -} - -auto StreamReader::decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const - -> DecodedResultsTsType { - if (use_filter && false == m_filtered_log_event_map.has_value()) { - return DecodedResultsTsType{emscripten::val::null()}; - } - - size_t length{0}; - if (use_filter) { - length = m_filtered_log_event_map->size(); - } else { - length = m_encoded_log_events.size(); - } - if (length < end_idx || begin_idx > end_idx) { - return DecodedResultsTsType{emscripten::val::null()}; - } - - std::string message; - constexpr size_t cDefaultReservedMessageLength{512}; - message.reserve(cDefaultReservedMessageLength); - auto const results{emscripten::val::array()}; - for (size_t i = begin_idx; i < end_idx; ++i) { - size_t log_event_idx{0}; - if (use_filter) { - log_event_idx = m_filtered_log_event_map->at(i); - } else { - log_event_idx = i; - } - auto const& log_event{m_encoded_log_events[log_event_idx]}; - - auto const parsed{log_event.get_message().decode_and_unparse()}; - if (false == parsed.has_value()) { - SPDLOG_ERROR("Failed to decode message."); - break; - } - message = parsed.value(); - - m_ts_pattern.insert_formatted_timestamp(log_event.get_timestamp(), message); - - EM_ASM( - { Emval.toValue($0).push([UTF8ToString($1), $2, $3, $4]); }, - results.as_handle(), - message.c_str(), - log_event.get_timestamp(), - log_event.get_log_level(), - log_event_idx + 1 + return std::unique_ptr( + new IrStreamReader(std::move(stream_reader_data_context)) ); } - return DecodedResultsTsType(results); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Unsupported, + __FILENAME__, + __LINE__, + std::format("Unable to create stream reader for IR data with version {}.", version) + }; } - -StreamReader::StreamReader( - StreamReaderDataContext&& stream_reader_data_context -) - : m_stream_reader_data_context{std::make_unique< - StreamReaderDataContext>( - std::move(stream_reader_data_context) - )}, - m_ts_pattern{m_stream_reader_data_context->get_deserializer().get_timestamp_pattern()} {} } // namespace clp_ffi_js::ir namespace { -EMSCRIPTEN_BINDINGS(ClpIrStreamReader) { +EMSCRIPTEN_BINDINGS(ClpStreamReader) { emscripten::register_type("Uint8Array"); emscripten::register_type( "Array<[string, number, number, number]>" ); emscripten::register_type("number[] | null"); - emscripten::class_("ClpIrStreamReader") + emscripten::class_("ClpStreamReader") .constructor( &clp_ffi_js::ir::StreamReader::create, emscripten::return_value_policy::take_ownership() - ) - .function( - "getNumEventsBuffered", - &clp_ffi_js::ir::StreamReader::get_num_events_buffered - ) - .function( - "getFilteredLogEventMap", - &clp_ffi_js::ir::StreamReader::get_filtered_log_event_map - ) - .function("filterLogEvents", &clp_ffi_js::ir::StreamReader::filter_log_events) - .function("deserializeStream", &clp_ffi_js::ir::StreamReader::deserialize_stream) - .function("decodeRange", &clp_ffi_js::ir::StreamReader::decode_range); + ); } } // namespace diff --git a/src/clp_ffi_js/ir/StreamReader.hpp b/src/clp_ffi_js/ir/StreamReader.hpp index dec6c360..ad9cbaa0 100644 --- a/src/clp_ffi_js/ir/StreamReader.hpp +++ b/src/clp_ffi_js/ir/StreamReader.hpp @@ -3,17 +3,9 @@ #include #include -#include -#include -#include -#include -#include #include -#include -#include - namespace clp_ffi_js::ir { EMSCRIPTEN_DECLARE_VAL_TYPE(DataArrayTsType); EMSCRIPTEN_DECLARE_VAL_TYPE(DecodedResultsTsType); @@ -25,12 +17,6 @@ EMSCRIPTEN_DECLARE_VAL_TYPE(FilteredLogEventMapTsType); */ class StreamReader { public: - /** - * Mapping between an index in the filtered log events collection to an index in the unfiltered - * log events collection. - */ - using FilteredLogEventsMap = std::optional>; - /** * Creates a StreamReader to read from the given array. * @@ -38,10 +24,11 @@ class StreamReader { * @return The created instance. * @throw ClpFfiJsException if any error occurs. */ - [[nodiscard]] static auto create(DataArrayTsType const& data_array) -> StreamReader; + [[nodiscard]] static auto create(DataArrayTsType const& data_array + ) -> std::unique_ptr; // Destructor - ~StreamReader() = default; + virtual ~StreamReader() = default; // Disable copy constructor and assignment operator StreamReader(StreamReader const&) = delete; @@ -55,19 +42,19 @@ class StreamReader { /** * @return The number of events buffered. */ - [[nodiscard]] auto get_num_events_buffered() const -> size_t; + [[nodiscard]] virtual auto get_num_events_buffered() const -> size_t = 0; /** * @return The filtered log events map. */ - [[nodiscard]] auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType; + [[nodiscard]] virtual auto get_filtered_log_event_map() const -> FilteredLogEventMapTsType = 0; /** * Generates a filtered collection from all log events. * * @param log_level_filter Array of selected log levels */ - void filter_log_events(emscripten::val const& log_level_filter); + virtual void filter_log_events(emscripten::val const& log_level_filter) = 0; /** * Deserializes all log events in the stream. After the stream has been exhausted, it will be @@ -75,7 +62,7 @@ class StreamReader { * * @return The number of successfully deserialized ("valid") log events. */ - [[nodiscard]] auto deserialize_stream() -> size_t; + [[nodiscard]] virtual auto deserialize_stream() -> size_t = 0; /** * Decodes log events in the range `[beginIdx, endIdx)` of the filtered or unfiltered @@ -92,21 +79,11 @@ class StreamReader { * @return null if any log event in the range doesn't exist (e.g. the range exceeds the number * of log events in the collection). */ - [[nodiscard]] auto - decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const -> DecodedResultsTsType; - -private: - // Constructor - explicit StreamReader(StreamReaderDataContext&& - stream_reader_data_context); + [[nodiscard]] virtual auto decode_range(size_t begin_idx, size_t end_idx, bool use_filter) const + -> DecodedResultsTsType = 0; - // Variables - std::vector> m_encoded_log_events; - std::unique_ptr> - m_stream_reader_data_context; - FilteredLogEventsMap m_filtered_log_event_map; - clp::TimestampPattern m_ts_pattern; +protected: + explicit StreamReader() = default; }; } // namespace clp_ffi_js::ir - #endif // CLP_FFI_JS_IR_STREAM_READER_HPP diff --git a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp index 091b0b05..651f741f 100644 --- a/src/clp_ffi_js/ir/StreamReaderDataContext.hpp +++ b/src/clp_ffi_js/ir/StreamReaderDataContext.hpp @@ -11,7 +11,7 @@ namespace clp_ffi_js::ir { /** - * The data context for a `StreamReader`. It encapsulates a chain of the following resources: + * The data context for a `IrStreamReader`. It encapsulates a chain of the following resources: * A `clp::ir::LogEventDeserializer` that reads from a * `clp::streaming_compression::zstd::Decompressor`, which in turn reads from a `clp::Array`. * @tparam encoded_variable_t Type of encoded variables encoded in the stream. diff --git a/src/clp_ffi_js/ir/decoding_methods.cpp b/src/clp_ffi_js/ir/decoding_methods.cpp new file mode 100644 index 00000000..81063f6a --- /dev/null +++ b/src/clp_ffi_js/ir/decoding_methods.cpp @@ -0,0 +1,81 @@ +#include "decoding_methods.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace clp_ffi_js::ir { +auto get_version(clp::ReaderInterface& reader) -> std::string { + // The encoding type bytes must be consumed before the metadata can be read. + rewind_reader_and_verify_encoding_type(reader); + + // Deserialize metadata bytes from preamble. + clp::ffi::ir_stream::encoded_tag_t metadata_type{}; + std::vector metadata_bytes; + auto const deserialize_preamble_result{ + clp::ffi::ir_stream::deserialize_preamble(reader, metadata_type, metadata_bytes) + }; + if (clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != deserialize_preamble_result) { + SPDLOG_CRITICAL( + "Failed to deserialize preamble for version reading: {}", + deserialize_preamble_result + ); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Failure, + __FILENAME__, + __LINE__, + "Failed to deserialize preamble for version reading." + }; + } + + // Deserialize metadata bytes which is encoded in JSON. + std::string_view const metadata_view{ + clp::size_checked_pointer_cast(metadata_bytes.data()), + metadata_bytes.size() + }; + nlohmann::json const metadata = nlohmann::json::parse(metadata_view); + + // Retrieve version from metadata. + auto const& version{metadata.at(clp::ffi::ir_stream::cProtocol::Metadata::VersionKey)}; + SPDLOG_INFO("The version is {}", version); + + return version; +} + +auto rewind_reader_and_verify_encoding_type(clp::ReaderInterface& reader) -> void { + reader.seek_from_begin(0); + + bool is_four_bytes_encoding{true}; + if (auto const err{clp::ffi::ir_stream::get_encoding_type(reader, is_four_bytes_encoding)}; + clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success != err) + { + SPDLOG_CRITICAL("Failed to decode encoding type, err={}", err); + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_MetadataCorrupted, + __FILENAME__, + __LINE__, + "Failed to decode encoding type." + }; + } + if (false == is_four_bytes_encoding) { + throw ClpFfiJsException{ + clp::ErrorCode::ErrorCode_Unsupported, + __FILENAME__, + __LINE__, + "IR stream uses unsupported encoding." + }; + } +} +} // namespace clp_ffi_js::ir diff --git a/src/clp_ffi_js/ir/decoding_methods.hpp b/src/clp_ffi_js/ir/decoding_methods.hpp new file mode 100644 index 00000000..6791fd21 --- /dev/null +++ b/src/clp_ffi_js/ir/decoding_methods.hpp @@ -0,0 +1,13 @@ +#ifndef CLP_FFI_JS_IR_DECODING_METHODS_HPP +#define CLP_FFI_JS_IR_DECODING_METHODS_HPP + +#include + +#include + +namespace clp_ffi_js::ir { +auto get_version(clp::ReaderInterface& reader) -> std::string; +auto rewind_reader_and_verify_encoding_type(clp::ReaderInterface& reader) -> void; +} // namespace clp_ffi_js::ir + +#endif // CLP_FFI_JS_IR_DECODING_METHODS_HPP