Skip to content

Commit

Permalink
Support Uncompressed Files
Browse files Browse the repository at this point in the history
PR found  linkedin#357
  • Loading branch information
AbdelrahmanMosly committed Aug 8, 2023
1 parent 282d77b commit 14574c6
Showing 1 changed file with 15 additions and 11 deletions.
26 changes: 15 additions & 11 deletions app/com/linkedin/drelephant/util/SparkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,11 @@ trait SparkUtils {
}
case None => {
val (logPath, codecName) = getLogPathAndCodecName(fs, fs.getUri.resolve(basePath.toUri), appId)

(logPath, Some(compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(sparkConf, codecName))))
if(codecName == UNCOMPRESSED){
(logPath, None)
} else {
(logPath, Some(compressionCodecMap.getOrElseUpdate(codecName, loadCompressionCodec(sparkConf, codecName))))
}
}
}

Expand Down Expand Up @@ -181,6 +184,7 @@ trait SparkUtils {

private val IN_PROGRESS = ".inprogress"
private val DEFAULT_COMPRESSION_CODEC = "snappy"
private val UNCOMPRESSED = "uncompressed"

private val compressionCodecClassNamesByShortName = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
Expand Down Expand Up @@ -227,14 +231,14 @@ trait SparkUtils {
val nameAndExtension = logPath.split('.')
if( nameAndExtension.length == 2 ) {
extension = Some(nameAndExtension(1))
val name = nameAndExtension(0)
val appIdAndAttempt = name.split('_')
if( appIdAndAttempt.length == 4 ) {
attempt = Some(appIdAndAttempt(3))
appId = Some(appIdAndAttempt.dropRight(1).mkString("_"))
} else {
appId = Some(name)
}
}
val name = nameAndExtension(0)
val appIdAndAttempt = name.split('_')
if (appIdAndAttempt.length == 4) {
attempt = Some(appIdAndAttempt(3))
appId = Some(appIdAndAttempt.dropRight(1).mkString("_"))
} else {
appId = Some(name)
}
(appId, attempt, extension)
}
Expand Down Expand Up @@ -272,7 +276,7 @@ trait SparkUtils {
"_" + sanitize(finalAttempt._2.get) +
"." + finalAttempt._3.get), finalAttempt._3.get)
// if codec is not available, but we found a file match with appId, use the actual file Path from the first match
case nocodec if nocodec._1 != None & nocodec._3 == None => (attemptsList(0).getPath(), DEFAULT_COMPRESSION_CODEC)
case nocodec if nocodec._1 != None & nocodec._3 == None => (attemptsList(0).getPath(), UNCOMPRESSED)

// This should be reached only if we can't parse the filename in the path.
// Try to construct a general path in that case.
Expand Down

0 comments on commit 14574c6

Please sign in to comment.