diff --git a/metagraph/src/graph/representation/hash/dbg_sshash.cpp b/metagraph/src/graph/representation/hash/dbg_sshash.cpp index 1840cfedbe..db9d7c68a2 100644 --- a/metagraph/src/graph/representation/hash/dbg_sshash.cpp +++ b/metagraph/src/graph/representation/hash/dbg_sshash.cpp @@ -17,6 +17,8 @@ namespace mtg { namespace graph { +const size_t kBufferSize = 10'000'000; + #if _PROTEIN_GRAPH static constexpr uint16_t bits_per_char = sshash::aa_uint_kmer_t::bits_per_char; #else @@ -103,19 +105,21 @@ DBGSSHash::DBGSSHash(const std::string &input_filename, common::logger->trace("Marking colour boundaries"); sdsl::bit_vector id(num_nodes_); { + ThreadPool thread_pool(get_num_threads()); ProgressBar progress_bar(num_nodes(), "Parsing sequences", std::cerr, !common::get_verbose()); - seq_io::read_fasta_file_critical(input_filename, [&](auto *read_stream) { - auto indices = mtg::graph::map_to_nodes_sequentially(*this, std::string(read_stream->seq.s, read_stream->seq.l)); + + auto process_sequence([&](const std::string &seq, const std::string &comment) { + auto indices = mtg::graph::map_to_nodes_sequentially(*this, seq); assert(std::equal(indices.begin(), indices.end() - 1, indices.begin() + 1, indices.end(), [](node_index a, node_index b) { return a + 1 == b; })); size_t pos = graph_index_to_sshash(indices[0]); id[pos] = true; - auto split_string = utils::split_string(std::string(read_stream->comment.s, read_stream->comment.l), " "); + auto split_string = utils::split_string(comment, " "); size_t old_pos = pos; - size_t nkmers = read_stream->seq.l - k_ + 1; + size_t nkmers = seq.size() - k_ + 1; for (const auto &part : split_string) { if (part[0] == 'C') { @@ -130,9 +134,34 @@ DBGSSHash::DBGSSHash(const std::string &input_filename, pos += nkmers; assert(pos == old_pos + nkmers); + return nkmers; + }); - progress_bar += nkmers; + BatchAccumulator> batcher( + [&](std::vector>&& buffer) { + thread_pool.enqueue([&](auto&& buffer) { + size_t total_kmers = 0; + for (const auto &[seq, comment] : buffer) { + total_kmers += process_sequence(seq, comment); + } + progress_bar += total_kmers; + }, std::move(buffer)); + }, + std::numeric_limits::max(), + kBufferSize + ); + + seq_io::read_fasta_file_critical(input_filename, [&](auto *read_stream) { + batcher.push_and_pay( + read_stream->seq.l + read_stream->comment.l, + std::make_pair( + std::string(read_stream->seq.s, read_stream->seq.l), + std::string(read_stream->comment.s, read_stream->comment.l) + ) + ); }); + + thread_pool.join(); } size_t update_batch = 10000;