Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi sched read #55

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 76 additions & 34 deletions src/rt/sched/behaviourcore.h
Original file line number Diff line number Diff line change
Expand Up @@ -519,29 +519,34 @@ namespace verona::rt
Cown::acquire(cown);
}

static std::tuple<size_t, size_t>
handle_read_only_enqueue(Slot* prev_slot, Slot* new_slot, Cown* cown)
static std::tuple<size_t, size_t> handle_read_only_enqueue(
Slot* prev_slot,
Slot* chain_first_slot,
Slot* chain_last_slot,
bool all_reads,
size_t first_consecutive_readers_count,
Cown* cown)
{
size_t ref_count = 0, ex_count = 0;
bool first_reader;

if (prev_slot && (prev_slot->set_next_slot_reader(new_slot)))
if (prev_slot && (prev_slot->set_next_slot_reader(chain_first_slot)))
{
Logging::cout() << " Previous slot is a writer or blocked reader cown "
<< *new_slot << Logging::endl;
<< *chain_first_slot << Logging::endl;
yield();
goto fn_out;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if direct returns in this code would be clearer.

Suggested change
goto fn_out;
return {0, 0};

I'm also not sure I get the usage of ex_count here. This is that all the reads in this chain have seen a READ_AVAILABLE, so they can all decrease there execution count by 1?

}

yield();
first_reader = cown->read_ref_count.add_read();
Logging::cout() << " Reader got the cown " << *new_slot << Logging::endl;
first_reader =
cown->read_ref_count.add_read(first_consecutive_readers_count);
Logging::cout() << " Reader got the cown " << *chain_first_slot
<< Logging::endl;
yield();

// TODO: This will not be correct in the multi-schedule cases.
// There needs to be a check that ensures the chain contains only
// reads. This will be calculated in the prepare phase.
new_slot->set_read_available();
if (all_reads)
chain_last_slot->set_read_available();

ex_count = 1;
if (first_reader)
Expand Down Expand Up @@ -790,12 +795,15 @@ namespace verona::rt
{
Cown* cown;
size_t first_body_index;
Slot* first_slot;
Slot* last_slot;
size_t transfer_count;
bool had_no_predecessor;
// The last two are only use for reads only chains
size_t ref_count;
size_t ex_count;
bool all_reads;
size_t first_consecutive_readers_count;
};
size_t i = 0;
size_t chain_count = 0;
Expand All @@ -807,14 +815,19 @@ namespace verona::rt
auto body = bodies[std::get<0>(cown_to_behaviour_slot_map[i])];
auto last_slot = std::get<1>(cown_to_behaviour_slot_map[i]);
size_t first_body_index = std::get<0>(cown_to_behaviour_slot_map[i]);
auto first_slot = last_slot;

// The number of RCs provided for the current cown by the when.
// I.e. how many moves of cown_refs there were.
size_t transfer_count = last_slot->is_move();

auto all_reads = last_slot->is_read_only();

Logging::cout() << "Processing " << cown << " " << body << " "
<< last_slot << " Index " << i << Logging::endl;

Comment on lines 816 to 828
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this would be a little clearer:

Suggested change
auto last_slot = std::get<1>(cown_to_behaviour_slot_map[i]);
size_t first_body_index = std::get<0>(cown_to_behaviour_slot_map[i]);
auto first_slot = last_slot;
// The number of RCs provided for the current cown by the when.
// I.e. how many moves of cown_refs there were.
size_t transfer_count = last_slot->is_move();
auto all_reads = last_slot->is_read_only();
Logging::cout() << "Processing " << cown << " " << body << " "
<< last_slot << " Index " << i << Logging::endl;
auto first_slot = std::get<1>(cown_to_behaviour_slot_map[i]);
size_t first_body_index = std::get<0>(cown_to_behaviour_slot_map[i]);
// The number of RCs provided for the current cown by the when.
// I.e. how many moves of cown_refs there were.
size_t transfer_count = first_slot->is_move();
auto all_reads = first_slot->is_read_only();
Logging::cout() << "Processing " << cown << " " << body << " "
<< first_slot << " Index " << i << Logging::endl;
auto last_slot = first_slot;

I also wonder if we should rename last_slot to curr_slot? That makes it clear it is varying during the loop.

size_t first_consecutive_readers_count = all_reads;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this would be slightly clearer?

Suggested change
size_t first_consecutive_readers_count = all_reads;
size_t first_consecutive_readers_count = all_reads? 1 : 0;


// Detect duplicates for this cown.
// This is required in two cases:
// * overlaps within a single behaviour.
Expand All @@ -824,6 +837,7 @@ namespace verona::rt
// If the body is the same, then we have an overlap within a single
// behaviour.
auto body_next = bodies[std::get<0>(cown_to_behaviour_slot_map[i])];
auto slot_next = std::get<1>(cown_to_behaviour_slot_map[i]);
if (body_next == body)
{
// Check if the caller passed an RC and add to the total.
Expand All @@ -841,27 +855,39 @@ namespace verona::rt
continue;
}

// For writers, create a chain of behaviours
if (!std::get<1>(cown_to_behaviour_slot_map[i])->is_read_only())
if (slot_next->is_read_only())
{
body = body_next;

// Extend the chain of behaviours linking on this behaviour
last_slot->set_next_slot_writer(body);
last_slot->set_ready();
last_slot->set_next_slot_reader(slot_next);
if (all_reads)
first_consecutive_readers_count++;
}
else
{
all_reads = false;

last_slot = std::get<1>(cown_to_behaviour_slot_map[i]);
continue;
// Extend the chain of behaviours linking on this behaviour
last_slot->set_next_slot_writer(body_next);
}

// TODO: Chain with reads and writes is not implemented.
abort();
last_slot->set_ready();
body = body_next;
last_slot = slot_next;
}

// For each chain you need the cown, the first and the last body of the
// chain
chain_info[chain_count++] = {
cown, first_body_index, last_slot, transfer_count, false, 0, 0};
cown,
first_body_index,
first_slot,
last_slot,
transfer_count,
false,
0,
0,
all_reads,
first_consecutive_readers_count};
Comment on lines +881 to +890
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the comment


// Mark the slot as ready for scheduling
last_slot->reset_status();
Expand All @@ -876,19 +902,26 @@ namespace verona::rt
auto* cown = chain_info[i].cown;
auto first_body_index = chain_info[i].first_body_index;
auto* first_body = bodies[first_body_index];
auto* new_slot = chain_info[i].last_slot;
auto* chain_last_slot = chain_info[i].last_slot;
auto* chain_first_slot = chain_info[i].first_slot;

auto prev_slot =
cown->last_slot.exchange(new_slot, std::memory_order_acq_rel);
cown->last_slot.exchange(chain_last_slot, std::memory_order_acq_rel);

yield();

if (prev_slot == nullptr)
{
chain_info[i].had_no_predecessor = true;
if (new_slot->is_read_only())
if (chain_first_slot->is_read_only())
{
auto counts = handle_read_only_enqueue(prev_slot, new_slot, cown);
auto counts = handle_read_only_enqueue(
prev_slot,
chain_first_slot,
chain_last_slot,
chain_info[i].all_reads,
chain_info[i].first_consecutive_readers_count,
cown);
chain_info[i].ref_count = std::get<0>(counts);
chain_info[i].ex_count = std::get<1>(counts);
}
Expand All @@ -902,17 +935,23 @@ namespace verona::rt
Aal::pause();
}

if (new_slot->is_read_only())
if (chain_last_slot->is_read_only())
{
auto counts = handle_read_only_enqueue(prev_slot, new_slot, cown);
auto counts = handle_read_only_enqueue(
prev_slot,
chain_first_slot,
chain_last_slot,
chain_info[i].all_reads,
chain_info[i].first_consecutive_readers_count,
cown);
chain_info[i].ref_count = std::get<0>(counts);
chain_info[i].ex_count = std::get<1>(counts);
continue;
}

Logging::cout()
<< " Writer waiting for cown. Set next of previous slot cown "
<< *new_slot << " previous " << *prev_slot << Logging::endl;
<< *chain_last_slot << " previous " << *prev_slot << Logging::endl;
prev_slot->set_next_slot_writer(first_body);
yield();
}
Expand All @@ -931,8 +970,6 @@ namespace verona::rt
Logging::cout() << "Setting slot " << slot << " to ready"
<< Logging::endl;
slot->set_ready();

// TODO: We chould also set the READ_AVAILABLE here
}

// Fourth phase - Process & Resolve
Expand All @@ -942,11 +979,13 @@ namespace verona::rt
auto* cown = chain_info[i].cown;
auto first_body_index = chain_info[i].first_body_index;
auto* first_body = bodies[first_body_index];
auto* curr_slot = chain_info[i].last_slot;
auto* chain_first_slot = chain_info[i].first_slot;
auto chain_had_no_predecessor = chain_info[i].had_no_predecessor;
auto transfer_count = chain_info[i].transfer_count;
auto ref_count = chain_info[i].ref_count;
auto ex_count = chain_info[i].ex_count;
auto first_consecutive_readers_count =
chain_info[i].first_consecutive_readers_count;

// Process reference count
if (chain_had_no_predecessor)
Expand All @@ -956,26 +995,29 @@ namespace verona::rt
acquire_with_transfer(cown, transfer_count, ref_count);

// Process writes without predecessor
if ((chain_had_no_predecessor) && (!curr_slot->is_read_only()))
if ((chain_had_no_predecessor) && (!chain_first_slot->is_read_only()))
{
if (cown->read_ref_count.try_write())
{
Logging::cout() << " Writer at head of queue and got the cown "
<< *curr_slot << Logging::endl;
<< *chain_first_slot << Logging::endl;
ex_count++;
yield();
}
Comment on lines 1001 to 1006
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can ex_count be anything other than 0 in this branch?

Suggested change
{
Logging::cout() << " Writer at head of queue and got the cown "
<< *curr_slot << Logging::endl;
<< *chain_first_slot << Logging::endl;
ex_count++;
yield();
}
{
Logging::cout() << " Writer at head of queue and got the cown "
<< *chain_first_slot << Logging::endl;
ec[first_body_index] += 1;
yield();
continue;
}

else
{
Logging::cout() << " Writer waiting for previous readers cown "
<< *curr_slot << Logging::endl;
<< *chain_first_slot << Logging::endl;
yield();
cown->next_writer = first_body;
}
}

// Process execution count
ec[first_body_index] += ex_count;

for (int i = 1; i < first_consecutive_readers_count; i++)
ec[first_body_index + i] += 1;
Copy link
Member

@mjp41 mjp41 Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is ex_count not used here? It feels like the whole chain should get to go if the read available was seen during scheduling, but otherwise it shouldn't? Doesn't this increase the count on the chain apart from the first entry, if read available wasn't seen?

}

Comment on lines 1015 to 1022
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can treat ex_count as a read chain that observed the read was available. Could we do this? Perhaps with a rename of ex_count?

Suggested change
// Process execution count
ec[first_body_index] += ex_count;
for (int i = 1; i < first_consecutive_readers_count; i++)
ec[first_body_index + i] += 1;
}
// Process execution count
if (ex_count != 0)
{
for (int i = 0; i < first_consecutive_readers_count; i++)
ec[first_body_index + i] += 1;
}
}

for (size_t i = 0; i < body_count; i++)
Expand Down
98 changes: 98 additions & 0 deletions test/func/atomic-sched/atomic-sched.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,99 @@ void test_body()
});
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should have some more racy tests. Where there are concurrent attempts to schedule, and hence concurrent potential wake and schedule, and sleep. We don't seem to cover those cases here.

void test_body_read_mixed()
{
Logging::cout() << "test_body()" << Logging::endl;

auto log = make_cown<Body>();
auto log2 = make_cown<Body>();

(when(read(log)) <<
[=](acquired_cown<const Body> b) {
for (int i = 0; i < 10; i++)
{
Logging::cout() << "Behaviour 1\n";
// sleep(1);
}
}) +
(when(log2) << [=](auto) {
for (int i = 0; i < 10; i++)
{
Logging::cout() << "Behaviour 2\n";
// sleep(1);
}
});
}

void test_body_read_same1()
{
Logging::cout() << "test_body()" << Logging::endl;

auto log = make_cown<Body>();

(when(read(log)) <<
[=](acquired_cown<const Body> b) {
for (int i = 0; i < 10; i++)
{
Logging::cout() << "Behaviour 1\n";
// sleep(1);
}
}) +
(when(log) << [=](auto) {
for (int i = 0; i < 10; i++)
{
Logging::cout() << "Behaviour 2\n";
// sleep(1);
}
});
}

void test_body_read_same2()
{
Logging::cout() << "test_body()" << Logging::endl;

auto log = make_cown<Body>();

(when(log) <<
[=](auto b) {
for (int i = 0; i < 10; i++)
{
Logging::cout() << "Behaviour 1\n";
// sleep(1);
}
}) +
(when(read(log)) << [=](auto) {
for (int i = 0; i < 10; i++)
{
Logging::cout() << "Behaviour 2\n";
// sleep(1);
}
});
}

void test_body_read_only_same()
{
Logging::cout() << "test_body()" << Logging::endl;

auto log = make_cown<Body>();

(when(read(log)) <<
[=](acquired_cown<const Body> b) {
for (int i = 0; i < 10; i++)
{
Logging::cout() << "Behaviour 1\n";
// sleep(1);
}
}) +
(when(read(log)) << [=](auto) {
for (int i = 0; i < 10; i++)
{
Logging::cout() << "Behaviour 2\n";
// sleep(1);
}
});
}

void test_body_same()
{
Logging::cout() << "test_body_same()" << Logging::endl;
Expand Down Expand Up @@ -94,5 +187,10 @@ int main(int argc, char** argv)
harness.run(test_body_same);
harness.run(test_body_smart);

harness.run(test_body_read_mixed);
harness.run(test_body_read_only_same);
harness.run(test_body_read_same1);
harness.run(test_body_read_same2);

return 0;
}
Loading