Skip to content

Commit

Permalink
Merge pull request #35 from badrinathpatchikolla/spark-3.4
Browse files Browse the repository at this point in the history
Added failure exception handling method
  • Loading branch information
mantovani authored May 19, 2023
2 parents 94749c6 + ade67ec commit 50d2ef2
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 48 deletions.
19 changes: 10 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

To add http.almaren dependency to your sbt build:
```
libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.6-3.3"
libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.7-3.4"
```

To run in spark-shell:

```
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.3,com.github.music-of-the-ainur:almaren-framework_2.12:0.9.8-3.3"
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.4,com.github.music-of-the-ainur:almaren-framework_2.12:0.9.9-3.4"
```

## Table of Contents
Expand Down Expand Up @@ -45,12 +45,14 @@ repository. It can be used using the `--packages` option or the

| version | Connector Artifact |
|----------------------------|-------------------------------------------------------------|
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.6-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.6-2.4` |
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.7-3.4` |
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.4` |
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.7-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.7-2.4` |


## Methods
Expand Down Expand Up @@ -163,7 +165,6 @@ Output:
| threadPoolSize | How many connections in parallel for each executor. parallelism = number of excutors * number of cores * threadPoolSize | Int |
| batchSize | How many records a single thread will process | Int |


#### Special Columns

##### Input:
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ lazy val scala213 = "2.13.9"
crossScalaVersions := Seq(scala212,scala213)
ThisBuild / scalaVersion := scala213

val sparkVersion = "3.3.0"
val sparkVersion = "3.4.0"
val majorVersionReg = "([0-9]+\\.[0-9]+).{0,}".r

val majorVersionReg(majorVersion) = sparkVersion
Expand All @@ -17,7 +17,7 @@ scalacOptions ++= Seq("-deprecation", "-feature")
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.8-${majorVersion}" % "provided",
"com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.9-${majorVersion}" % "provided",
"com.lihaoyi" %% "requests" % "0.7.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"org.scalatest" %% "scalatest" % "3.2.14" % "test"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package com.github.music.of.the.ainur.almaren.http

import java.util.concurrent.Executors

import com.github.music.of.the.ainur.almaren.Tree
import com.github.music.of.the.ainur.almaren.builder.Core
import com.github.music.of.the.ainur.almaren.state.core.Main
import org.apache.spark.sql.{DataFrame, Row}
import requests.Session
import requests.{RequestFailedException, Session}

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
Expand All @@ -20,7 +19,7 @@ final case class Response(
`__STATUS_MSG__`:Option[String] = None,
`__ERROR__`:Option[String] = None,
`__ELAPSED_TIME__`:Long,
`__URL__`:String)
`__URL__`:String)

final case class ResponseBatch(
`__ID__`:Seq[String],
Expand All @@ -30,7 +29,7 @@ final case class ResponseBatch(
`__STATUS_MSG__`:Option[String] = None,
`__ERROR__`:Option[String] = None,
`__ELAPSED_TIME__`:Long,
`__URL__`:String,
`__URL__`:String,
`__DATA__`:String)


Expand Down Expand Up @@ -66,32 +65,37 @@ private[almaren] case class HTTP(
rows.map(row => request(row,s))
})
val requests:Future[Iterator[Seq[Response]]] = Future.sequence(data)
Await.result(requests,Duration.Inf).flatMap(s => s)
Await.result(requests, Duration.Inf).flatten
})
result.toDF
}

private def request(row:Row, session:Session): Response = {
val url = row.getAs[Any](Alias.UrlCol).toString()
val url = row.getAs[Any](Alias.UrlCol).toString
val startTime = System.currentTimeMillis()
val response = Try(requestHandler(row,session,url,headers,params ++ hiddenParams,method,connectTimeout,readTimeout))
val elapsedTime = System.currentTimeMillis() - startTime
val id = row.getAs[Any](Alias.IdCol).toString()
val id = row.getAs[Any](Alias.IdCol).toString

def getResponse(r: requests.Response) = Response(
id,
Some(r.text()),
r.headers,
Some(r.statusCode),
r.statusMessage match {
case null => None
case _ => Some(r.statusMessage)
},
`__ELAPSED_TIME__` = elapsedTime,
`__URL__` = url
)

response match {
case Success(r) => Response(
id,
Some(r.text()),
r.headers,
Some(r.statusCode),
r.statusMessage match {
case null => None
case _ => Some(r.statusMessage)
},
`__ELAPSED_TIME__` = elapsedTime,
`__URL__` = url)
case Success(r) => getResponse(r)
case Failure(re: RequestFailedException) => getResponse(re.response)
case Failure(f) => {
logger.error("Almaren HTTP Request Error", f)
Response(id, `__ERROR__` = Some(f.getMessage()), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url)
Response(id, `__ERROR__` = Some(f.getMessage), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url)
}
}
}
Expand Down Expand Up @@ -121,38 +125,40 @@ private[almaren] case class HTTPBatch(
val s = session()
val data = batchDelimiter(rows)
val startTime = System.currentTimeMillis()
def getResponse(r: requests.Response) = ResponseBatch(
rows.map(row => row.getAs[Any](Alias.IdCol).toString),
Some(r.text()),
r.headers,
Some(r.statusCode),
r.statusMessage match {
case null => None
case _ => Some(r.statusMessage)
},
`__ELAPSED_TIME__` = System.currentTimeMillis() - startTime,
`__URL__` = url,
`__DATA__` = data
)
Try{request(data,s)} match {
case Success(r) => ResponseBatch(
rows.map(row => row.getAs[Any](Alias.IdCol).toString()),
Some(r.text()),
r.headers,
Some(r.statusCode),
r.statusMessage match {
case null => None
case _ => Some(r.statusMessage)
},
`__ELAPSED_TIME__` = System.currentTimeMillis() - startTime,
`__URL__` = url,
`__DATA__` = data
)
case Success(r) => getResponse(r)
case Failure(re: RequestFailedException) => getResponse(re.response)
case Failure(f) => {
logger.error("Almaren HTTP Batch Request Error", f)
ResponseBatch(
rows.map(row => row.getAs[Any](Alias.IdCol).toString()),
`__ERROR__` = Some(f.getMessage()),
rows.map(row => row.getAs[Any](Alias.IdCol).toString),
`__ERROR__` = Some(f.getMessage),
`__ELAPSED_TIME__` = System.currentTimeMillis() - startTime,
`__URL__` = url,
`__DATA__` = data
)

}
}
})
})
result.toDF
}

private def request(data:String, session:Session): requests.Response =
private def request(data:String, session:Session): requests.Response =
requestHandler(data,session,url,headers,params ++ hiddenParams,method,connectTimeout,readTimeout)


Expand Down Expand Up @@ -242,4 +248,4 @@ object HTTPConn {
val defaultSession = () => requests.Session()

implicit class HTTPImplicit(val container: Option[Tree]) extends HTTPConnector
}
}

0 comments on commit 50d2ef2

Please sign in to comment.