Skip to content

Commit 554e324

Browse files
authored
Merge pull request #6 from eseiler/misc/refine
Use two queues
2 parents c1a6dff + cd22411 commit 554e324

File tree

7 files changed

+153
-157
lines changed

7 files changed

+153
-157
lines changed

include/fpgalign/search/search.hpp

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,57 +4,27 @@
44

55
#pragma once
66

7-
#include <mutex>
8-
#include <string>
9-
#include <vector>
10-
117
#include <fpgalign/config.hpp>
8+
#include <fpgalign/contrib/slotted_cart_queue.hpp>
129
#include <fpgalign/meta.hpp>
1310

1411
namespace search
1512
{
1613

17-
struct hit
18-
{
19-
std::vector<uint64_t> bins;
20-
};
21-
22-
struct wip_alignment
14+
struct alignment_info
2315
{
2416
size_t bin;
2517
size_t sequence_number;
2618
size_t position;
2719
size_t idx;
2820
};
2921

30-
class alignment_vector
31-
{
32-
private:
33-
std::vector<wip_alignment> data;
34-
std::mutex mtx;
35-
36-
public:
37-
std::vector<wip_alignment> & get() noexcept
38-
{
39-
return data;
40-
}
41-
42-
std::vector<wip_alignment> const & get() const noexcept
43-
{
44-
return data;
45-
}
46-
47-
void emplace_back(wip_alignment elem)
48-
{
49-
std::lock_guard guard{mtx};
50-
data.emplace_back(std::move(elem));
51-
}
52-
53-
void emplace_back(wip_alignment & elem) = delete;
54-
};
55-
5622
void search(config const & config);
57-
std::vector<hit> ibf(config const & config, meta & meta);
58-
std::vector<wip_alignment> fmindex(config const & config, meta & meta, std::vector<hit> hits);
23+
void ibf(config const & config, meta & meta, scq::slotted_cart_queue<size_t> & filter_queue);
24+
void fmindex(config const & config,
25+
meta & meta,
26+
scq::slotted_cart_queue<size_t> & filter_queue,
27+
scq::slotted_cart_queue<alignment_info> & alignment_queue);
28+
void do_alignment(config const & config, meta & meta, scq::slotted_cart_queue<alignment_info> & alignment_queue);
5929

6030
} // namespace search

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ set (FPGAlign_SOURCE_FILES
1313
search/ibf.cpp
1414
search/fmindex.cpp
1515
search/search.cpp
16+
search/do_alignment.cpp
1617
)
1718

1819
# An object library (without main) to be used in multiple targets.

src/build/fmindex.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ void fmindex(config const & config, meta & meta)
5454
{
5555
read_reference_into(reference, meta, i);
5656

57-
fmc::BiFMIndex<5> index{reference, /*samplingRate*/ 16, config.threads};
57+
fmc::BiFMIndex<5> index{reference, /*samplingRate*/ 16, /*threads*/ 1u};
5858

5959
{
6060
std::ofstream os{fmt::format("{}.{}.fmindex", config.output_path.c_str(), i), std::ios::binary};

src/search/do_alignment.cpp

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
// SPDX-FileCopyrightText: 2006-2025 Knut Reinert & Freie Universität Berlin
2+
// SPDX-FileCopyrightText: 2016-2025 Knut Reinert & MPI für molekulare Genetik
3+
// SPDX-License-Identifier: BSD-3-Clause
4+
5+
#include <seqan3/alignment/cigar_conversion/cigar_from_alignment.hpp>
6+
#include <seqan3/alignment/pairwise/align_pairwise.hpp>
7+
#include <seqan3/alphabet/nucleotide/dna4.hpp>
8+
#include <seqan3/io/sam_file/output.hpp>
9+
10+
#include <fpgalign/search/search.hpp>
11+
12+
namespace search
13+
{
14+
15+
using sam_out_t = seqan3::sam_file_output<seqan3::fields<seqan3::field::seq,
16+
seqan3::field::id,
17+
seqan3::field::ref_id,
18+
seqan3::field::ref_offset,
19+
seqan3::field::cigar,
20+
// seqan3::field::qual,
21+
seqan3::field::mapq>>;
22+
23+
void task(meta & meta, std::span<alignment_info> alignment_infos, sam_out_t & sam_out)
24+
{
25+
static seqan3::configuration const align_config =
26+
seqan3::align_cfg::method_global{seqan3::align_cfg::free_end_gaps_sequence1_leading{true},
27+
seqan3::align_cfg::free_end_gaps_sequence2_leading{false},
28+
seqan3::align_cfg::free_end_gaps_sequence1_trailing{true},
29+
seqan3::align_cfg::free_end_gaps_sequence2_trailing{false}}
30+
| seqan3::align_cfg::edit_scheme | seqan3::align_cfg::output_alignment{}
31+
| seqan3::align_cfg::output_begin_position{} | seqan3::align_cfg::output_score{};
32+
33+
for (auto & [bin, sequence_number, position, idx] : alignment_infos)
34+
{
35+
auto & seq = meta.queries[idx].sequence();
36+
auto seq_view = std::views::transform(seq,
37+
[](seqan3::dna4 const in) -> uint8_t
38+
{
39+
return in.to_rank() + 1u;
40+
});
41+
auto & seq_id = meta.queries[idx].id();
42+
auto & ref = meta.references[bin][sequence_number];
43+
auto & ref_id = meta.ref_ids[bin][sequence_number];
44+
45+
size_t const start = position - static_cast<size_t>(position != 0u);
46+
size_t const length = seq.size();
47+
auto it = std::ranges::next(ref.begin(), start, ref.end());
48+
auto end = std::ranges::next(it, length + 1u, ref.end());
49+
std::span ref_text{it, end};
50+
51+
for (auto && alignment : seqan3::align_pairwise(std::tie(ref_text, seq_view), align_config))
52+
{
53+
auto cigar = seqan3::cigar_from_alignment(alignment.alignment());
54+
size_t ref_offset = alignment.sequence1_begin_position() + 2 + start;
55+
size_t map_qual = 60u + alignment.score();
56+
57+
sam_out.emplace_back(seq,
58+
seq_id,
59+
ref_id,
60+
ref_offset,
61+
cigar,
62+
// record.base_qualities(),
63+
map_qual);
64+
}
65+
}
66+
}
67+
68+
void do_alignment(config const & config, meta & meta, scq::slotted_cart_queue<alignment_info> & alignment_queue)
69+
{
70+
sam_out_t sam_out{config.output_path};
71+
72+
while (true)
73+
{
74+
scq::cart_future<alignment_info> cart = alignment_queue.dequeue();
75+
if (!cart.valid())
76+
return;
77+
78+
task(meta, cart.get().second, sam_out);
79+
}
80+
}
81+
82+
} // namespace search

src/search/fmindex.cpp

Lines changed: 33 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -30,67 +30,48 @@ fmc::BiFMIndex<5> load_index(config const & config, size_t const id)
3030
return index;
3131
}
3232

33-
std::vector<wip_alignment> fmindex(config const & config, meta & meta, std::vector<hit> hits)
33+
void fmindex(config const & config,
34+
meta & meta,
35+
scq::slotted_cart_queue<size_t> & filter_queue,
36+
scq::slotted_cart_queue<alignment_info> & alignment_queue)
3437
{
35-
// todo capacity
36-
// each slot = 1 bin
37-
// a cart is full if it has 5 elements (hits)
38-
alignment_vector res;
38+
#pragma omp parallel num_threads(config.threads)
3939
{
40-
scq::slotted_cart_queue<size_t> queue{
41-
{.slots = meta.number_of_bins, .carts = meta.number_of_bins, .capacity = 5}};
42-
size_t thread_id{};
43-
44-
auto get_thread = [&]()
40+
while (true)
4541
{
46-
return std::jthread(
47-
[&, thread_id = thread_id++]()
42+
scq::cart_future<size_t> cart = filter_queue.dequeue();
43+
if (!cart.valid())
44+
break;
45+
auto [slot, span] = cart.get();
46+
auto index = load_index(config, slot.value);
47+
for (auto idx : span)
48+
{
49+
auto callback = [&](auto cursor, size_t)
4850
{
49-
while (true)
51+
for (auto j : cursor)
5052
{
51-
scq::cart_future<size_t> cart = queue.dequeue();
52-
if (!cart.valid())
53-
return;
54-
auto [slot, span] = cart.get();
55-
auto index = load_index(config, slot.value);
56-
for (auto idx : span)
57-
{
58-
auto callback = [&](auto cursor, size_t)
59-
{
60-
for (auto j : cursor)
61-
{
62-
auto [entry, offset] = index.locate(j);
63-
auto [seqId, pos] = entry;
64-
res.emplace_back(wip_alignment{.bin = slot.value,
65-
.sequence_number = seqId,
66-
.position = pos + offset,
67-
.idx = idx});
68-
}
69-
};
70-
71-
auto seq_view = std::views::transform(meta.queries[idx].sequence(),
72-
[](seqan3::dna4 const in) -> uint8_t
73-
{
74-
return in.to_rank() + 1u;
75-
});
76-
77-
fmc::search<true>(index, seq_view, config.errors, callback);
78-
}
53+
auto [entry, offset] = index.locate(j);
54+
auto [seqId, pos] = entry;
55+
alignment_queue.enqueue(scq::slot_id{0u},
56+
alignment_info{.bin = slot.value,
57+
.sequence_number = seqId,
58+
.position = pos + offset,
59+
.idx = idx});
7960
}
80-
});
81-
};
61+
};
8262

83-
std::vector<std::jthread> worker(config.threads);
84-
std::ranges::generate(worker, get_thread);
63+
auto seq_view = std::views::transform(meta.queries[idx].sequence(),
64+
[](seqan3::dna4 const in) -> uint8_t
65+
{
66+
return in.to_rank() + 1u;
67+
});
8568

86-
for (auto && [idx, hit] : seqan::stl::views::enumerate(hits))
87-
for (auto bin : hit.bins)
88-
queue.enqueue(scq::slot_id{bin}, idx);
89-
90-
queue.close();
91-
} // Wait for threads to finish
69+
fmc::search<true>(index, seq_view, config.errors, callback);
70+
}
71+
}
72+
}
9273

93-
return res.get();
74+
alignment_queue.close();
9475
}
9576

9677
} // namespace search

src/search/ibf.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ threshold::threshold get_thresholder(config const & config, meta const & meta)
3939
using seqfile_t = seqan3::sequence_file_input<dna4_traits, seqan3::fields<seqan3::field::id, seqan3::field::seq>>;
4040
using record_t = typename seqfile_t::record_type;
4141

42-
std::vector<hit> ibf(config const & config, meta & meta)
42+
void ibf(config const & config, meta & meta, scq::slotted_cart_queue<size_t> & filter_queue)
4343
{
4444
seqan::hibf::interleaved_bloom_filter ibf{};
4545

@@ -60,8 +60,6 @@ std::vector<hit> ibf(config const & config, meta & meta)
6060
return result;
6161
}();
6262

63-
std::vector<hit> hits(meta.queries.size());
64-
6563
#pragma omp parallel num_threads(config.threads)
6664
{
6765
auto agent = ibf.membership_agent();
@@ -80,11 +78,14 @@ std::vector<hit> ibf(config const & config, meta & meta)
8078
hashes.assign(view.begin(), view.end());
8179

8280
auto & result = agent.membership_for(hashes, thresholder.get(hashes.size()));
83-
std::ranges::copy(result, std::back_inserter(hits[i].bins));
81+
for (size_t bin : result)
82+
{
83+
filter_queue.enqueue(scq::slot_id{bin}, i);
84+
}
8485
}
8586
}
8687

87-
return hits;
88+
filter_queue.close();
8889
}
8990

9091
} // namespace search

0 commit comments

Comments
 (0)