diff --git a/README.md b/README.md index e1db62c..4b428b6 100644 --- a/README.md +++ b/README.md @@ -5,12 +5,12 @@ To add http.almaren Connector to your sbt build: ``` -libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.6-3.1" +libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.7-3.1" ``` To run in spark-shell: ``` -spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.8-3.1,com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.1" +spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.9-3.1,com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.1" ``` ## Table of Contents @@ -40,12 +40,14 @@ HTTP Connector is available in [Maven Central](https://mvnrepository.com/artifac | versions | Connector Artifact | |----------------------------|-------------------------------------------------------------| -| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.6-3.3` | -| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.3` | -| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.2` | -| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-3.1` | -| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.6-2.4` | -| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.6-2.4` | +| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.7-3.4` | +| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.4` | +| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.7-3.3` | +| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.3` | +| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.2` | +| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-3.1` | +| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.7-2.4` | +| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.7-2.4` | ## Methods diff --git a/build.sbt b/build.sbt index 62ecb60..76915bd 100644 --- a/build.sbt +++ b/build.sbt @@ -15,7 +15,7 @@ scalacOptions ++= Seq("-deprecation", "-feature") libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", - "com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.8-${majorVersion}" % "provided", + "com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.9-${majorVersion}" % "provided", "com.lihaoyi" %% "requests" % "0.7.1", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", "org.scalatest" %% "scalatest" % "3.2.14" % "test" diff --git a/src/main/scala/com/github/music/of/the/ainur/almaren/http/HTTPConnector.scala b/src/main/scala/com/github/music/of/the/ainur/almaren/http/HTTPConnector.scala index 0c746a0..ee058b6 100644 --- a/src/main/scala/com/github/music/of/the/ainur/almaren/http/HTTPConnector.scala +++ b/src/main/scala/com/github/music/of/the/ainur/almaren/http/HTTPConnector.scala @@ -6,7 +6,7 @@ import com.github.music.of.the.ainur.almaren.Tree import com.github.music.of.the.ainur.almaren.builder.Core import com.github.music.of.the.ainur.almaren.state.core.Main import org.apache.spark.sql.{DataFrame, Row} -import requests.Session +import requests.{RequestFailedException, Session} import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration.Duration @@ -66,32 +66,37 @@ private[almaren] case class HTTP( rows.map(row => request(row,s)) }) val requests:Future[Iterator[Seq[Response]]] = Future.sequence(data) - Await.result(requests,Duration.Inf).flatMap(s => s) + Await.result(requests, Duration.Inf).flatten }) result.toDF } private def request(row:Row, session:Session): Response = { - val url = row.getAs[Any](Alias.UrlCol).toString() + val url = row.getAs[Any](Alias.UrlCol).toString val startTime = System.currentTimeMillis() val response = Try(requestHandler(row,session,url,headers,params ++ hiddenParams,method,connectTimeout,readTimeout)) val elapsedTime = System.currentTimeMillis() - startTime - val id = row.getAs[Any](Alias.IdCol).toString() + val id = row.getAs[Any](Alias.IdCol).toString + + def getResponse(r: requests.Response) = Response( + id, + Some(r.text()), + r.headers, + Some(r.statusCode), + r.statusMessage match { + case null => None + case _ => Some(r.statusMessage) + }, + `__ELAPSED_TIME__` = elapsedTime, + `__URL__` = url + ) + response match { - case Success(r) => Response( - id, - Some(r.text()), - r.headers, - Some(r.statusCode), - r.statusMessage match { - case null => None - case _ => Some(r.statusMessage) - }, - `__ELAPSED_TIME__` = elapsedTime, - `__URL__` = url) + case Success(r) => getResponse(r) + case Failure(re: RequestFailedException) => getResponse(re.response) case Failure(f) => { logger.error("Almaren HTTP Request Error", f) - Response(id, `__ERROR__` = Some(f.getMessage()), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url) + Response(id, `__ERROR__` = Some(f.getMessage), `__ELAPSED_TIME__` = elapsedTime, `__URL__` = url) } } } @@ -121,25 +126,29 @@ private[almaren] case class HTTPBatch( val s = session() val data = batchDelimiter(rows) val startTime = System.currentTimeMillis() + + def getResponse(r: requests.Response) = ResponseBatch( + rows.map(row => row.getAs[Any](Alias.IdCol).toString), + Some(r.text()), + r.headers, + Some(r.statusCode), + r.statusMessage match { + case null => None + case _ => Some(r.statusMessage) + }, + `__ELAPSED_TIME__` = System.currentTimeMillis() - startTime, + `__URL__` = url, + `__DATA__` = data + ) + Try{request(data,s)} match { - case Success(r) => ResponseBatch( - rows.map(row => row.getAs[Any](Alias.IdCol).toString()), - Some(r.text()), - r.headers, - Some(r.statusCode), - r.statusMessage match { - case null => None - case _ => Some(r.statusMessage) - }, - `__ELAPSED_TIME__` = System.currentTimeMillis() - startTime, - `__URL__` = url, - `__DATA__` = data - ) + case Success(r) => getResponse(r) + case Failure(re: RequestFailedException) => getResponse(re.response) case Failure(f) => { logger.error("Almaren HTTP Batch Request Error", f) ResponseBatch( - rows.map(row => row.getAs[Any](Alias.IdCol).toString()), - `__ERROR__` = Some(f.getMessage()), + rows.map(row => row.getAs[Any](Alias.IdCol).toString), + `__ERROR__` = Some(f.getMessage), `__ELAPSED_TIME__` = System.currentTimeMillis() - startTime, `__URL__` = url, `__DATA__` = data