Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

package nextflow.processor

import nextflow.Global
import nextflow.Session
import nextflow.fusion.FusionHelper

import static nextflow.processor.TaskStatus.*

import java.nio.file.NoSuchFileException
Expand Down Expand Up @@ -249,6 +253,9 @@ abstract class TaskHandler {
catch( IOException e ) {
log.debug "[WARN] Cannot read trace file: $file -- Cause: ${e.message}"
}
// If Fusion is enabled read parse the use of accelerator form .command.log
if( Global.session && FusionHelper.isFusionEnabled(Global.session as Session) )
record.parseFusionAccelerator(task.workDir?.resolve(TaskRun.CMD_LOG))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't sound great parsing a log file to determine this capability

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, but this is how Fusion currently provides the GPU metric. This is why I keep it as draft.

}

return record
Expand Down
45 changes: 45 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class TraceRecord implements Serializable {
transient private CloudMachineInfo machineInfo
transient private ContainerMeta containerMeta
transient private Integer numSpotInterruptions
transient private Boolean accelerator

/**
* Convert the given value to a string
Expand Down Expand Up @@ -627,4 +628,48 @@ class TraceRecord implements Serializable {
void setContainerMeta(ContainerMeta meta) {
this.containerMeta = meta
}

Boolean getAccelerator() {
return accelerator
}

void setAccelerator(Boolean acc) {
this.accelerator = acc
}

void parseFusionAccelerator(Path file) {
this.accelerator = parseFusionAccelerator0(file)
}

/**
* Parses the Fusion accelerator value.
* Fusion writes FUSION_GPU_USED=true|false in the first line of the log file.
*/
private Boolean parseFusionAccelerator0(Path file) {
if ( !file.exists() ) {
return null
}

String line = file.withReader { it.readLine() }

if (!line) {
return null
}

line = line.trim()

if (!line.startsWith("FUSION_GPU_USED=")) {
return null
}

String value = line.substring("FUSION_GPU_USED=".length()).trim()

if (!value.equalsIgnoreCase("true") && !value.equalsIgnoreCase("false")) {
return null
}

return value.toBoolean()
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,78 @@ class TraceRecordTest extends Specification {
rec2.getNumSpotInterruptions() == null
}

def 'should manage accelerator field and not persist it across serialization'() {
given:
def rec = new TraceRecord()

expect:
rec.getAccelerator() == null
and:
rec.accelerator == null

when:
rec.setAccelerator(true)

then:
rec.getAccelerator() == true
rec.accelerator == true

when:
rec.setAccelerator(false)

then:
rec.getAccelerator() == false
rec.accelerator == false

when:
def buf = rec.serialize()
def rec2 = TraceRecord.deserialize(buf)

then:
rec2.getAccelerator() == null
}

@Unroll
def 'should parse fusion accelerator from file'() {
given:
def rec = new TraceRecord()
def file = TestHelper.createInMemTempFile('fusion-log')
file.text = CONTENT

when:
rec.parseFusionAccelerator(file)

then:
rec.getAccelerator() == EXPECTED

where:
CONTENT | EXPECTED
'FUSION_GPU_USED=true\n' | true
'FUSION_GPU_USED=false\n' | false
'FUSION_GPU_USED=TRUE\n' | true
'FUSION_GPU_USED=FALSE\n' | false
' FUSION_GPU_USED=true \n' | true
' FUSION_GPU_USED=false \n' | false
'FUSION_GPU_USED=true \nother line' | true
'FUSION_GPU_USED=false\nother line' | false
'other content\n' | null
'FUSION_GPU=true\n' | null
'FUSION_GPU_USED=\n' | null
'FUSION_GPU_USED=invalid\n' | null
'FUSION_GPU_USED=123\n' | null
'' | null
}

def 'should parse fusion accelerator when file does not exist'() {
given:
def rec = new TraceRecord()
def file = Path.of('/non/existent/file.log')

when:
rec.parseFusionAccelerator(file)

then:
rec.getAccelerator() == null
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ class TowerClient implements TraceObserverV2 {
record.machineType = trace.getMachineInfo()?.type
record.priceModel = trace.getMachineInfo()?.priceModel?.toString()
record.numSpotInterruptions = trace.getNumSpotInterruptions()
record.accelerator = trace.getAccelerator()

return record
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,4 +560,29 @@ class TowerClientTest extends Specification {
req.tasks[0].numSpotInterruptions == 3
}

def 'should include accelerator in task map'() {
given:
def client = Spy(new TowerClient())
client.getWorkflowProgress(true) >> new WorkflowProgress()

def now = System.currentTimeMillis()
def trace = new TraceRecord([
taskId: 42,
process: 'foo',
workdir: "/work/dir",
cpus: 1,
submit: now-2000,
start: now-1000,
complete: now
])
trace.setAccelerator(true)

when:
def req = client.makeTasksReq([trace])

then:
req.tasks.size() == 1
req.tasks[0].accelerator == true
}

}
Loading