diff --git a/README.md b/README.md index 2e7b686..e0f3591 100644 --- a/README.md +++ b/README.md @@ -5,13 +5,13 @@ To add http.almaren dependency to your sbt build: ``` -libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.8-2.4" +libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.9-2.4" ``` To run in spark-shell: ``` -spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.11:0.9.9-2.4,com.github.music-of-the-ainur:http-almaren_2.11:1.2.8-2.4" +spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.11:0.9.10-2.4,com.github.music-of-the-ainur:http-almaren_2.11:1.2.9-2.4" ``` ## Table of Contents @@ -43,14 +43,14 @@ repository. | versions | Connector Artifact | |----------------------------|-------------------------------------------------------------| -| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.8-3.4` | -| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.4` | -| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.8-3.3` | -| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.3` | -| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.2` | -| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.1` | -| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-2.4` | -| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.8-2.4` | +| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.9-3.4` | +| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.4` | +| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.9-3.3` | +| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.3` | +| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.2` | +| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.1` | +| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-2.4` | +| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.9-2.4` | ## Methods @@ -501,118 +501,3 @@ How to concatenate by new line: (rows: Seq[Row]) => rows.map(row => row.getAs[String](Alias.DataCol)).mkString("\n") ``` -### HTTP Row - -It will initiate an HTTP request for each Row, extracting headers, parameters, and hidden parameters from each Row. - -``` -$ curl -X PUT -H "Authorization: {SESSION_ID}" \ --H "Content-Type: text/csv" \ --H "Accept: text/csv" \ ---data-binary @"filename" \ -https://localhost/objects/documents/batch -``` - -#### Example - -```scala - import com.github.music.of.the.ainur.almaren.Almaren -import com.github.music.of.the.ainur.almaren.builder.Core.Implicit -import com.github.music.of.the.ainur.almaren.http.HTTPConn.HTTPImplicit -import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType} -import scala.collection.JavaConverters.asScalaIteratorConverter -import spark.implicits._ - -val almaren = Almaren("http-almaren") - -val df = Seq( - ("John", "Smith", "London"), - ("David", "Jones", "India"), - ("Michael", "Johnson", "Indonesia"), - ("Chris", "Lee", "Brazil"), - ("Mike", "Brown", "Russia") -).toDF("first_name", "last_name", "country").coalesce(1) - -df.createOrReplaceTempView("person_info") -val requestSchema = StructType(Seq( - StructField("__URL__", StringType), - StructField("__DATA__", StringType), - StructField("__REQUEST_HEADERS__", MapType(StringType, StringType)), - StructField("__REQUEST_PARAMS__", MapType(StringType, StringType)), - StructField("__REQUEST_HIDDEN_PARAMS__", MapType(StringType, StringType)) -)) - -//Constructing the request dataframe by generating necessary input columns from each row in the input dataframe. -val requestRows: Seq[Row] = df.toLocalIterator.asScala.toList.map(row => { - val firstName = row.getAs[String]("first_name") - val lastName = row.getAs[String]("last_name") - val country = row.getAs[String]("country") - val url = s"http://localhost:3000/fireshots/getInfo" - val headers = scala.collection.mutable.Map[String, String]() - headers.put("data", firstName) - val params = scala.collection.mutable.Map[String, String]() - params.put("params", lastName) - val hiddenParams = scala.collection.mutable.Map[String, String]() - hiddenParams.put("hidden_params", country) - - Row(url, - s"""{"first_name" : "$firstName","last_name":"$lastName","country":"$country"} """, - headers, - params, - hiddenParams - ) -}) - -val requestDataframe = spark.createDataFrame(spark.sparkContext.parallelize(requestRows), requestSchema) - - -val responseDf = almaren.builder - .sourceDataFrame(requestDataframe) - .sqlExpr("monotonically_increasing_id() as __ID__", "__DATA__", "__URL__", "__REQUEST_HEADERS__", "__REQUEST_PARAMS__", "__REQUEST_HIDDEN_PARAMS__") - .httpRow(method = "POST") - .batch - -responseDf.show(false) -``` - -#### Parameters - -| Parameter | Description | Type | -|----------------|-------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------| -| headers | HTTP headers | Map[String,String] | -| params | HTTP params | Map[String,String] | -| hiddenParams | HTTP params which are hidden (not exposed in logs) | Map[String,String] | -| method | HTTP Method | String | -| requestHandler | Closure to handle HTTP request | (Row,Session,String,Map[String,String],String) => requests.Respons | -| session | Closure to handle HTTP sessions | () = requests.Session | -| connectTimeout | Timeout in ms to keep the connection keep-alive, it's recommended to keep this number high | Int | -| readTimeout | Maximum number of ms to perform a single HTTP request | Int | -| threadPoolSize | How many connections in parallel for each executor. parallelism = number of excutors * number of cores * threadPoolSize | Int | -| batchSize | How many records a single thread will process | Int | - -| Parameters | Mandatory | Description | Column Type | -|-------------------------------|-----------|------------------------------------------------------------------------------------|--------------------| -| \_\_ID\_\_ | Yes | This field will be in response of http.almaren component, it's useful to join data | String | -| \_\_URL\_\_ | Yes | Used to perform the HTTP request | String | -| \_\_DATA\_\_ | No | Data Content, used in POST/PUT Method HTTP requests | String | -| \_\_REQUEST_HEADERS\_\_ | Yes | HTTP headers | Map[String,String] | -| \_\_REQUEST_PARAMS\_\_ | Yes | HTTP params | Map[String,String] | -| \_\_REQUEST_HIDDEN_PARAMS\_\_ | Yes | HTTP params which are hidden (not exposed in logs) | Map[String,String] | - -| Parameters | Description | -|-------------------------------|------------------------------------------------------------| -| \_\_ID\_\_ | Custom ID , This field will be useful to join data | -| \_\_BODY\_\_ | HTTP response | -| \_\_HEADER\_\_ | HTTP response header | -| \_\_STATUS_CODE\_\_ | HTTP response code | -| \_\_STATUS_MSG\_\_ | HTTP response message | -| \_\_ERROR\_\_ | Java Exception | -| \_\_ELAPSED_TIME\_\_ | Request time in ms | -| \_\_URL\_\_ | HTTP request URL | -| \_\_DATA\_\_ | Data Content, used in POST/PUT Method HTTP requests | -| \_\_REQUEST_HEADERS\_\_ | HTTP Request headers | -| \_\_REQUEST_PARAMS\_\_ | HTTP Request params | -| \_\_REQUEST_HIDDEN_PARAMS\_\_ | HTTP Request params which are hidden (not exposed in logs) | - - diff --git a/build.sbt b/build.sbt index c9a4a26..76d61e9 100644 --- a/build.sbt +++ b/build.sbt @@ -17,7 +17,7 @@ scalacOptions ++= Seq("-deprecation", "-feature") libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-sql" % sparkVersion % "provided", - "com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.9-${majorVersion}" % "provided", + "com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.10-${majorVersion}" % "provided", "com.lihaoyi" %% "requests" % "0.7.1", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", "org.scalatest" %% "scalatest" % "3.2.14" % "test"