Skip to content

Commit

Permalink
[fix](local exchange) Use tokens to ensure that try_dequeue maintains…
Browse files Browse the repository at this point in the history
… strict order. (#45741)

### What problem does this PR solve?

The previously used moodycamel::ConcurrentQueue does not guarantee that
the enqueue order matches the dequeue order,
even when there is only a single producer and a single consumer.  
Refer to this issue:
cameron314/concurrentqueue#316
We can use tokens to ensure the correct order.

### Release note

None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [ ] Regression test
    - [ ] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [x] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [x] Previous test can cover this change.
        - [ ] No code files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

- Behavior changed:
    - [x] No.
    - [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?
    - [x] No.
- [ ] Yes. <!-- Add document PR link here. eg:
apache/doris-website#1214 -->

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
  • Loading branch information
Mryange authored and Your Name committed Dec 24, 2024
1 parent 4fb8403 commit f6f160c
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 2 deletions.
5 changes: 3 additions & 2 deletions be/src/pipeline/local_exchange/local_exchanger.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,13 @@ template <typename BlockType>
struct BlockQueue {
std::atomic<bool> eos = false;
moodycamel::ConcurrentQueue<BlockType> data_queue;
moodycamel::ProducerToken ptok {data_queue};
BlockQueue() : eos(false), data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
BlockQueue(BlockQueue<BlockType>&& other)
: eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
inline bool enqueue(BlockType const& item) {
if (!eos) {
if (!data_queue.enqueue(item)) [[unlikely]] {
if (!data_queue.enqueue(ptok, item)) [[unlikely]] {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Exception occurs in data queue [size = {}] of local exchange.",
data_queue.size_approx());
Expand All @@ -121,7 +122,7 @@ struct BlockQueue {

inline bool enqueue(BlockType&& item) {
if (!eos) {
if (!data_queue.enqueue(std::move(item))) [[unlikely]] {
if (!data_queue.enqueue(ptok, std::move(item))) [[unlikely]] {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Exception occurs in data queue [size = {}] of local exchange.",
data_queue.size_approx());
Expand Down
109 changes: 109 additions & 0 deletions be/test/vec/exec/concurrent_queue_order.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <concurrentqueue.h>
#include <gtest/gtest.h>

#include <memory>
#include <vector>

namespace doris::vectorized {

class ConcurrentQueueOrder : public testing::Test {
public:
ConcurrentQueueOrder() = default;
~ConcurrentQueueOrder() override = default;
};
// The previously used moodycamel::ConcurrentQueue does not guarantee that the enqueue order matches the dequeue order,
// even when there is only a single producer and a single consumer.
// Refer to this issue: https://github.com/cameron314/concurrentqueue/issues/316
// We can use tokens to ensure the correct order.
TEST_F(ConcurrentQueueOrder, test_not_guarantee_order) {
{
moodycamel::ConcurrentQueue<int> data_queue;
int num = 0;
std::mutex m;
std::atomic_bool flag = true;

auto task = [&](int thread_id) {
while (flag) {
std::lock_guard lc {m};
data_queue.enqueue(num++);
}
};
std::thread input1(task, 0);
std::thread input2(task, 1);
std::thread input3(task, 2);

std::this_thread::sleep_for(std::chrono::milliseconds(50));
flag = false;

input3.join();
input1.join();
input2.join();

std::cout << "queue size " << data_queue.size_approx() << "\n";
std::vector<int> outputs;
int output;
while (data_queue.try_dequeue(output)) {
outputs.push_back(output);
}

EXPECT_FALSE(std::is_sorted(outputs.begin(), outputs.end()));
std::cout << "output is sorted : " << std::is_sorted(outputs.begin(), outputs.end())
<< "\n";
}
}

TEST_F(ConcurrentQueueOrder, test_guarantee_order) {
{
moodycamel::ConcurrentQueue<int> data_queue;
moodycamel::ProducerToken ptok {data_queue};
int num = 0;
std::mutex m;
std::atomic_bool flag = true;

auto task = [&](int thread_id) {
while (flag) {
std::lock_guard lc {m};
data_queue.enqueue(ptok, num++);
}
};
std::thread input1(task, 0);
std::thread input2(task, 1);
std::thread input3(task, 2);

std::this_thread::sleep_for(std::chrono::milliseconds(50));
flag = false;

input3.join();
input1.join();
input2.join();

std::cout << "queue size " << data_queue.size_approx() << "\n";
std::vector<int> outputs;
int output;
while (data_queue.try_dequeue(output)) {
outputs.push_back(output);
}

EXPECT_TRUE(std::is_sorted(outputs.begin(), outputs.end()));
std::cout << "output is sorted : " << std::is_sorted(outputs.begin(), outputs.end())
<< "\n";
}
}
} // namespace doris::vectorized

0 comments on commit f6f160c

Please sign in to comment.