Skip to content

Commit

Permalink
Merge pull request #36 from badrinathpatchikolla/spark-3.1
Browse files Browse the repository at this point in the history
Added failure exception handling method
  • Loading branch information
mantovani authored May 19, 2023
2 parents 5918691 + f381e37 commit d049687
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 40 deletions.
18 changes: 10 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
To add http.almaren Connector to your sbt build:

```
libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.6-3.1"
libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.7-3.1"
```
To run in spark-shell:

```
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.8-3.1,com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.1"
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.9-3.1,com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.1"
```
## Table of Contents

Expand Down Expand Up @@ -40,12 +40,14 @@ HTTP Connector is available in [Maven Central](https://mvnrepository.com/artifac

| versions | Connector Artifact |
|----------------------------|-------------------------------------------------------------|
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.6-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.6-2.4` |
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.7-3.4` |
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.4` |
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.7-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.7-2.4` |

## Methods

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ scalacOptions ++= Seq("-deprecation", "-feature")
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.8-${majorVersion}" % "provided",
"com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.9-${majorVersion}" % "provided",
"com.lihaoyi" %% "requests" % "0.7.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"org.scalatest" %% "scalatest" % "3.2.14" % "test"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import com.github.music.of.the.ainur.almaren.Tree
import com.github.music.of.the.ainur.almaren.builder.Core
import com.github.music.of.the.ainur.almaren.state.core.Main
import org.apache.spark.sql.{DataFrame, Row}
import requests.Session
import requests.{RequestFailedException, Session}

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
Expand Down Expand Up @@ -66,32 +66,37 @@ private[almaren] case class HTTP(
rows.map(row => request(row,s))
})
val requests:Future[Iterator[Seq[Response]]] = Future.sequence(data)
Await.result(requests,Duration.Inf).flatMap(s => s)
Await.result(requests, Duration.Inf).flatten
})
result.toDF
}

private def request(row:Row, session:Session): Response = {
val url = row.getAs[Any](Alias.UrlCol).toString()
val url = row.getAs[Any](Alias.UrlCol).toString
val startTime = System.currentTimeMillis()
val response = Try(requestHandler(row,session,url,headers,params ++ hiddenParams,method,connectTimeout,readTimeout))
val elapsedTime = System.currentTimeMillis() - startTime
val id = row.getAs[Any](Alias.IdCol).toString()
val id = row.getAs[Any](Alias.IdCol).toString

def getResponse(r: requests.Response) = Response(
id,
Some(r.text()),
r.headers,
Some(r.statusCode),
r.statusMessage match {
case null => None
case _ => Some(r.statusMessage)
},
`__ELAPSED_TIME__` = elapsedTime,
`__URL__` = url
)

response match {
case Success(r) => Response(
id,
Some(r.text()),
r.headers,
Some(r.statusCode),
r.statusMessage match {
case null => None
case _ => Some(r.statusMessage)
},
`__ELAPSED_TIME__` = elapsedTime,
`__URL__` = url)
case Success(r) => getResponse(r)
case Failure(re: RequestFailedException) => getResponse(re.response)
case Failure(f) => {
logger.error("Almaren HTTP Request Error", f)
Response(id, `__ERROR__` = Some(f.getMessage()), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url)
Response(id, `__ERROR__` = Some(f.getMessage), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url)
}
}
}
Expand Down Expand Up @@ -121,25 +126,29 @@ private[almaren] case class HTTPBatch(
val s = session()
val data = batchDelimiter(rows)
val startTime = System.currentTimeMillis()

def getResponse(r: requests.Response) = ResponseBatch(
rows.map(row => row.getAs[Any](Alias.IdCol).toString),
Some(r.text()),
r.headers,
Some(r.statusCode),
r.statusMessage match {
case null => None
case _ => Some(r.statusMessage)
},
`__ELAPSED_TIME__` = System.currentTimeMillis() - startTime,
`__URL__` = url,
`__DATA__` = data
)

Try{request(data,s)} match {
case Success(r) => ResponseBatch(
rows.map(row => row.getAs[Any](Alias.IdCol).toString()),
Some(r.text()),
r.headers,
Some(r.statusCode),
r.statusMessage match {
case null => None
case _ => Some(r.statusMessage)
},
`__ELAPSED_TIME__` = System.currentTimeMillis() - startTime,
`__URL__` = url,
`__DATA__` = data
)
case Success(r) => getResponse(r)
case Failure(re: RequestFailedException) => getResponse(re.response)
case Failure(f) => {
logger.error("Almaren HTTP Batch Request Error", f)
ResponseBatch(
rows.map(row => row.getAs[Any](Alias.IdCol).toString()),
`__ERROR__` = Some(f.getMessage()),
rows.map(row => row.getAs[Any](Alias.IdCol).toString),
`__ERROR__` = Some(f.getMessage),
`__ELAPSED_TIME__` = System.currentTimeMillis() - startTime,
`__URL__` = url,
`__DATA__` = data
Expand Down

0 comments on commit d049687

Please sign in to comment.