Skip to content

Commit

Permalink
SWDEV-475341 - Fix stream resolution for graphs launches
Browse files Browse the repository at this point in the history
This issue was happening because of incorrect usage of getStream call,
if we get the null stream first and then typecast it, and call on
getStream again, we lose the advantage of simply passing "nullptr" to
indicate NULL stream. Thus we enter the waitActiveStream call and add
barriers to sync across streams.

Change-Id: I94dc4e3ec927295b9e1ab6dee4b37d7d3e00b0cc
  • Loading branch information
saleelk authored and rakesroy committed Jul 30, 2024
1 parent 6b1a453 commit d92c4ff
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 43 deletions.
33 changes: 15 additions & 18 deletions hipamd/src/hip_graph_internal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ hipError_t EnqueueGraphWithSingleList(std::vector<hip::Node>& topoOrder, hip::St
} else {
topoOrder[i]->SetStream(hip_stream, graphExec);
status = topoOrder[i]->CreateCommand(topoOrder[i]->GetQueue());
topoOrder[i]->EnqueueCommands(reinterpret_cast<hipStream_t>(hip_stream));
topoOrder[i]->EnqueueCommands(hip_stream);
}
}

Expand All @@ -617,17 +617,14 @@ hipError_t EnqueueGraphWithSingleList(std::vector<hip::Node>& topoOrder, hip::St
return status;
}

hipError_t GraphExec::Run(hipStream_t stream) {
hipError_t GraphExec::Run(hipStream_t graph_launch_stream) {
hipError_t status = hipSuccess;

if (hip::getStream(stream) == nullptr) {
return hipErrorInvalidResourceHandle;
}
auto hip_stream = (stream == nullptr) ? hip::getCurrentDevice()->NullStream()
: reinterpret_cast<hip::Stream*>(stream);
hip::Stream* launch_stream = hip::getStream(graph_launch_stream);

if (flags_ & hipGraphInstantiateFlagAutoFreeOnLaunch) {
if (!topoOrder_.empty()) {
topoOrder_[0]->GetParentGraph()->FreeAllMemory(hip_stream);
topoOrder_[0]->GetParentGraph()->FreeAllMemory(launch_stream);
}
}

Expand All @@ -644,31 +641,31 @@ hipError_t GraphExec::Run(hipStream_t stream) {
}

if (parallelLists_.size() == 1 &&
instantiateDeviceId_ == hip_stream->DeviceId()) {
instantiateDeviceId_ == launch_stream->DeviceId()) {
if (DEBUG_CLR_GRAPH_PACKET_CAPTURE) {
// If the graph has kernels that does device side allocation, during packet capture, heap is
// allocated because heap pointer has to be added to the AQL packet, and initialized during
// graph launch.
static bool initialized = false;
if (!initialized && HasHiddenHeap()) {
hip_stream->vdev()->HiddenHeapInit();
launch_stream->vdev()->HiddenHeapInit();
initialized = true;
}
}
status = EnqueueGraphWithSingleList(topoOrder_, hip_stream, this);
status = EnqueueGraphWithSingleList(topoOrder_, launch_stream, this);
} else if (parallelLists_.size() == 1 &&
instantiateDeviceId_ != hip_stream->DeviceId()) {
instantiateDeviceId_ != launch_stream->DeviceId()) {
for (int i = 0; i < topoOrder_.size(); i++) {
topoOrder_[i]->SetStream(hip_stream, this);
topoOrder_[i]->SetStream(launch_stream, this);
status = topoOrder_[i]->CreateCommand(topoOrder_[i]->GetQueue());
topoOrder_[i]->EnqueueCommands(stream);
topoOrder_[i]->EnqueueCommands(launch_stream);
}
} else {
UpdateStream(parallelLists_, hip_stream, this);
UpdateStream(parallelLists_, launch_stream, this);
amd::Command* rootCommand = nullptr;
amd::Command* endCommand = nullptr;
status = FillCommands(parallelLists_, nodeWaitLists_, topoOrder_, clonedGraph_, rootCommand,
endCommand, hip_stream);
endCommand, launch_stream);
if (status != hipSuccess) {
return status;
}
Expand All @@ -677,15 +674,15 @@ hipError_t GraphExec::Run(hipStream_t stream) {
rootCommand->release();
}
for (int i = 0; i < topoOrder_.size(); i++) {
topoOrder_[i]->EnqueueCommands(reinterpret_cast<hipStream_t>(topoOrder_[i]->GetQueue()));
topoOrder_[i]->EnqueueCommands(topoOrder_[i]->GetQueue());
}
if (endCommand != nullptr) {
endCommand->enqueue();
endCommand->release();
}
}
amd::ScopedLock lock(GraphExecStatusLock_);
GraphExecStatus_[this] = std::make_pair(hip_stream, false);
GraphExecStatus_[this] = std::make_pair(launch_stream, false);
ResetQueueIndex();
return status;
}
Expand Down
49 changes: 24 additions & 25 deletions hipamd/src/hip_graph_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ struct GraphNode : public hipGraphNodeDOTAttribute {
}
virtual hipError_t GetNumParallelStreams(size_t &num) { return hipSuccess; }
/// Enqueue commands part of the node
virtual void EnqueueCommands(hipStream_t stream) {
virtual void EnqueueCommands(hip::Stream* stream) {
// If the node is disabled it becomes empty node. To maintain ordering just enqueue marker.
// Node can be enabled/disabled only for kernel, memcpy and memset nodes.
if (!isEnabled_ &&
Expand All @@ -335,8 +335,7 @@ struct GraphNode : public hipGraphNodeDOTAttribute {
if (!commands_.empty()) {
waitList = commands_[0]->eventWaitList();
}
hip::Stream* hip_stream = hip::getStream(stream);
amd::Command* command = new amd::Marker(*hip_stream, !kMarkerDisableFlush, waitList);
amd::Command* command = new amd::Marker(*stream, !kMarkerDisableFlush, waitList);
command->enqueue();
command->release();
return;
Expand Down Expand Up @@ -757,10 +756,10 @@ struct ChildGraphNode : public GraphNode {
bool TopologicalOrder(std::vector<Node>& TopoOrder) override {
return childGraph_->TopologicalOrder(TopoOrder);
}
void EnqueueCommands(hipStream_t stream) override {
void EnqueueCommands(hip::Stream* stream) override {
if (graphCaptureStatus_) {
hipError_t status =
EnqueueGraphWithSingleList(childGraphNodeOrder_, reinterpret_cast<hip::Stream*>(stream));
EnqueueGraphWithSingleList(childGraphNodeOrder_, stream);
} else {
// enqueue child graph start command
if (startCommand_ != nullptr) {
Expand Down Expand Up @@ -822,16 +821,15 @@ class GraphKernelNode : public GraphNode {
size_t GetKernargSegmentByteSize() const { return kernargSegmentByteSize_; }
size_t GetKernargSegmentAlignment() const { return kernargSegmentAlignment_; }
bool HasHiddenHeap() const { return hasHiddenHeap_; }
void EnqueueCommands(hipStream_t stream) override {
void EnqueueCommands(hip::Stream* stream) override {
// If the node is disabled it becomes empty node. To maintain ordering just enqueue marker.
// Node can be enabled/disabled only for kernel, memcpy and memset nodes.
if (!isEnabled_) {
amd::Command::EventWaitList waitList;
if (!commands_.empty()) {
waitList = commands_[0]->eventWaitList();
}
hip::Stream* hip_stream = hip::getStream(stream);
amd::Command* command = new amd::Marker(*hip_stream, !kMarkerDisableFlush, waitList);
amd::Command* command = new amd::Marker(*stream, !kMarkerDisableFlush, waitList);
command->enqueue();
command->release();
return;
Expand All @@ -845,6 +843,7 @@ class GraphKernelNode : public GraphNode {
command->release();
}
}

void PrintAttributes(std::ostream& out, hipGraphDebugDotFlags flag) override {
out << "[";
out << "style";
Expand Down Expand Up @@ -1284,12 +1283,12 @@ class GraphMemcpyNode : public GraphNode {
return status;
}

virtual void EnqueueCommands(hipStream_t stream) override {
virtual void EnqueueCommands(hip::Stream* stream) override {
if ( (copyParams_.kind == hipMemcpyHostToHost || copyParams_.kind == hipMemcpyDefault) &&
isEnabled_ && IsHtoHMemcpy(copyParams_.dstPtr.ptr, copyParams_.srcPtr.ptr)) {
ihipHtoHMemcpy(copyParams_.dstPtr.ptr, copyParams_.srcPtr.ptr,
copyParams_.extent.width * copyParams_.extent.height *
copyParams_.extent.depth, *hip::getStream(stream));
copyParams_.extent.depth, *stream);
return;
}
GraphNode::EnqueueCommands(stream);
Expand Down Expand Up @@ -1438,7 +1437,7 @@ class GraphMemcpyNode1D : public GraphMemcpyNode {
return status;
}

virtual void EnqueueCommands(hipStream_t stream) override {
virtual void EnqueueCommands(hip::Stream* stream) override {
bool isH2H = false;
if ((kind_ == hipMemcpyHostToHost || kind_ == hipMemcpyDefault) && IsHtoHMemcpy(dst_, src_)) {
isH2H = true;
Expand All @@ -1451,22 +1450,21 @@ class GraphMemcpyNode1D : public GraphMemcpyNode {
if (isEnabled_) {
//HtoH
if (isH2H) {
ihipHtoHMemcpy(dst_, src_, count_, *hip::getStream(stream));
ihipHtoHMemcpy(dst_, src_, count_, *stream);
return;
}
amd::Command* command = commands_[0];
amd::HostQueue* cmdQueue = command->queue();
hip::Stream* hip_stream = hip::getStream(stream);

if (cmdQueue == hip_stream) {
if (cmdQueue == stream) {
command->enqueue();
command->release();
return;
}

amd::Command::EventWaitList waitList;
amd::Command* depdentMarker = nullptr;
amd::Command* cmd = hip_stream->getLastQueuedCommand(true);
amd::Command* cmd = stream->getLastQueuedCommand(true);
if (cmd != nullptr) {
waitList.push_back(cmd);
amd::Command* depdentMarker = new amd::Marker(*cmdQueue, true, waitList);
Expand All @@ -1483,7 +1481,7 @@ class GraphMemcpyNode1D : public GraphMemcpyNode {
if (cmd != nullptr) {
waitList.clear();
waitList.push_back(cmd);
amd::Command* depdentMarker = new amd::Marker(*hip_stream, true, waitList);
amd::Command* depdentMarker = new amd::Marker(*stream, true, waitList);
if (depdentMarker != nullptr) {
depdentMarker->enqueue(); // Make sure future commands of queue synced with command
depdentMarker->release();
Expand All @@ -1492,8 +1490,7 @@ class GraphMemcpyNode1D : public GraphMemcpyNode {
}
} else {
amd::Command::EventWaitList waitList;
hip::Stream* hip_stream = hip::getStream(stream);
amd::Command* command = new amd::Marker(*hip_stream, !kMarkerDisableFlush, waitList);
amd::Command* command = new amd::Marker(*stream, !kMarkerDisableFlush, waitList);
command->enqueue();
command->release();
}
Expand Down Expand Up @@ -1973,11 +1970,12 @@ class GraphEventRecordNode : public GraphNode {
return status;
}

void EnqueueCommands(hipStream_t stream) override {
void EnqueueCommands(hip::Stream* stream) override {
if (!commands_.empty()) {
hip::Event* e = reinterpret_cast<hip::Event*>(event_);
// command release during enqueueRecordCommand
hipError_t status = e->enqueueRecordCommand(stream, commands_[0], true);
hipError_t status = e->enqueueRecordCommand(
reinterpret_cast<hipStream_t>(stream), commands_[0], true);
if (status != hipSuccess) {
ClPrint(amd::LOG_ERROR, amd::LOG_CODE,
"[hipGraph] Enqueue event record command failed for node %p - status %d", this,
Expand Down Expand Up @@ -2026,10 +2024,11 @@ class GraphEventWaitNode : public GraphNode {
return status;
}

void EnqueueCommands(hipStream_t stream) override {
void EnqueueCommands(hip::Stream* stream) override {
if (!commands_.empty()) {
hip::Event* e = reinterpret_cast<hip::Event*>(event_);
hipError_t status = e->enqueueStreamWaitCommand(stream, commands_[0]);
hipError_t status =
e->enqueueStreamWaitCommand(reinterpret_cast<hipStream_t>(stream), commands_[0]);
if (status != hipSuccess) {
ClPrint(amd::LOG_ERROR, amd::LOG_CODE,
"[hipGraph] Enqueue stream wait command failed for node %p - status %d", this,
Expand Down Expand Up @@ -2087,7 +2086,7 @@ class GraphHostNode : public GraphNode {
NodeParams->fn(NodeParams->userData);
}

void EnqueueCommands(hipStream_t stream) override {
void EnqueueCommands(hip::Stream* stream) override {
if (!commands_.empty()) {
if (!commands_[0]->setCallback(CL_COMPLETE, GraphHostNode::Callback, &NodeParams_)) {
ClPrint(amd::LOG_ERROR, amd::LOG_CODE, "[hipGraph] Failed during setCallback");
Expand Down Expand Up @@ -2411,7 +2410,7 @@ class GraphDrvMemcpyNode : public GraphNode {
return status;
}

void EnqueueCommands(hipStream_t stream) override {
void EnqueueCommands(hip::Stream* stream) override {
bool isHtoH = false;
if(copyParams_.srcMemoryType == hipMemoryTypeHost &&
copyParams_.dstMemoryType == hipMemoryTypeHost &&
Expand All @@ -2421,7 +2420,7 @@ class GraphDrvMemcpyNode : public GraphNode {
if (isEnabled_ && isHtoH) {
ihipHtoHMemcpy(copyParams_.dstHost, copyParams_.srcHost,
copyParams_.WidthInBytes * copyParams_.Height *
copyParams_.Depth, *hip::getStream(stream));
copyParams_.Depth, *stream);
return;
}
GraphNode::EnqueueCommands(stream);
Expand Down

0 comments on commit d92c4ff

Please sign in to comment.