Skip to content

Commit

Permalink
parallel file parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
hmusta committed Oct 11, 2024
1 parent a0ca81d commit 97bad90
Showing 1 changed file with 34 additions and 5 deletions.
39 changes: 34 additions & 5 deletions metagraph/src/graph/representation/hash/dbg_sshash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
namespace mtg {
namespace graph {

const size_t kBufferSize = 10'000'000;

#if _PROTEIN_GRAPH
static constexpr uint16_t bits_per_char = sshash::aa_uint_kmer_t<uint64_t>::bits_per_char;
#else
Expand Down Expand Up @@ -103,19 +105,21 @@ DBGSSHash::DBGSSHash(const std::string &input_filename,
common::logger->trace("Marking colour boundaries");
sdsl::bit_vector id(num_nodes_);
{
ThreadPool thread_pool(get_num_threads());
ProgressBar progress_bar(num_nodes(),
"Parsing sequences",
std::cerr, !common::get_verbose());
seq_io::read_fasta_file_critical(input_filename, [&](auto *read_stream) {
auto indices = mtg::graph::map_to_nodes_sequentially(*this, std::string(read_stream->seq.s, read_stream->seq.l));

auto process_sequence([&](const std::string &seq, const std::string &comment) {
auto indices = mtg::graph::map_to_nodes_sequentially(*this, seq);
assert(std::equal(indices.begin(), indices.end() - 1,
indices.begin() + 1, indices.end(),
[](node_index a, node_index b) { return a + 1 == b; }));
size_t pos = graph_index_to_sshash(indices[0]);
id[pos] = true;
auto split_string = utils::split_string(std::string(read_stream->comment.s, read_stream->comment.l), " ");
auto split_string = utils::split_string(comment, " ");
size_t old_pos = pos;
size_t nkmers = read_stream->seq.l - k_ + 1;
size_t nkmers = seq.size() - k_ + 1;

for (const auto &part : split_string) {
if (part[0] == 'C') {
Expand All @@ -130,9 +134,34 @@ DBGSSHash::DBGSSHash(const std::string &input_filename,
pos += nkmers;

assert(pos == old_pos + nkmers);
return nkmers;
});

progress_bar += nkmers;
BatchAccumulator<std::pair<std::string, std::string>> batcher(
[&](std::vector<std::pair<std::string, std::string>>&& buffer) {
thread_pool.enqueue([&](auto&& buffer) {
size_t total_kmers = 0;
for (const auto &[seq, comment] : buffer) {
total_kmers += process_sequence(seq, comment);
}
progress_bar += total_kmers;
}, std::move(buffer));
},
std::numeric_limits<size_t>::max(),
kBufferSize
);

seq_io::read_fasta_file_critical(input_filename, [&](auto *read_stream) {
batcher.push_and_pay(
read_stream->seq.l + read_stream->comment.l,
std::make_pair(
std::string(read_stream->seq.s, read_stream->seq.l),
std::string(read_stream->comment.s, read_stream->comment.l)
)
);
});

thread_pool.join();
}

size_t update_batch = 10000;
Expand Down

0 comments on commit 97bad90

Please sign in to comment.