Skip to content

Commit

Permalink
Merge pull request #51 from rohithvangapalli/spark-3.3
Browse files Browse the repository at this point in the history
Updated the spark and almaren version
  • Loading branch information
badrinathpatchikolla authored Sep 21, 2023
2 parents 2bb3d93 + e5b4e7e commit 5e6c218
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 128 deletions.
136 changes: 10 additions & 126 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
To add http.almaren dependency to your sbt build:

```
libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.8-3.3"
libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.9-3.3"
```

To run in spark-shell:

```
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.9-3.3,com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.3"
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.10-3.3,com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.3"
```

## Table of Contents
Expand Down Expand Up @@ -43,14 +43,14 @@ repository.

| versions | Connector Artifact |
|----------------------------|-------------------------------------------------------------|
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.8-3.4` |
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.4` |
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.8-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.8-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.8-2.4` |
| Spark 3.4.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.9-3.4` |
| Spark 3.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.4` |
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.9-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.9-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.9-2.4` |

## Methods

Expand Down Expand Up @@ -500,119 +500,3 @@ How to concatenate by new line:
```scala
(rows: Seq[Row]) => rows.map(row => row.getAs[String](Alias.DataCol)).mkString("\n")
```

### HTTP Row

It will initiate an HTTP request for each Row, extracting headers, parameters, and hidden parameters from each Row.

```
$ curl -X PUT -H "Authorization: {SESSION_ID}" \
-H "Content-Type: text/csv" \
-H "Accept: text/csv" \
--data-binary @"filename" \
https://localhost/objects/documents/batch
```

#### Example

```scala
import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.http.HTTPConn.HTTPImplicit
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
import scala.collection.JavaConverters.asScalaIteratorConverter
import spark.implicits._

val almaren = Almaren("http-almaren")

val df = Seq(
("John", "Smith", "London"),
("David", "Jones", "India"),
("Michael", "Johnson", "Indonesia"),
("Chris", "Lee", "Brazil"),
("Mike", "Brown", "Russia")
).toDF("first_name", "last_name", "country").coalesce(1)

df.createOrReplaceTempView("person_info")
val requestSchema = StructType(Seq(
StructField("__URL__", StringType),
StructField("__DATA__", StringType),
StructField("__REQUEST_HEADERS__", MapType(StringType, StringType)),
StructField("__REQUEST_PARAMS__", MapType(StringType, StringType)),
StructField("__REQUEST_HIDDEN_PARAMS__", MapType(StringType, StringType))
))

//Constructing the request dataframe by generating necessary input columns from each row in the input dataframe.
val requestRows: Seq[Row] = df.toLocalIterator.asScala.toList.map(row => {
val firstName = row.getAs[String]("first_name")
val lastName = row.getAs[String]("last_name")
val country = row.getAs[String]("country")
val url = s"http://localhost:3000/fireshots/getInfo"
val headers = scala.collection.mutable.Map[String, String]()
headers.put("data", firstName)
val params = scala.collection.mutable.Map[String, String]()
params.put("params", lastName)
val hiddenParams = scala.collection.mutable.Map[String, String]()
hiddenParams.put("hidden_params", country)

Row(url,
s"""{"first_name" : "$firstName","last_name":"$lastName","country":"$country"} """,
headers,
params,
hiddenParams
)
})

val requestDataframe = spark.createDataFrame(spark.sparkContext.parallelize(requestRows), requestSchema)


val responseDf = almaren.builder
.sourceDataFrame(requestDataframe)
.sqlExpr("monotonically_increasing_id() as __ID__", "__DATA__", "__URL__", "__REQUEST_HEADERS__", "__REQUEST_PARAMS__", "__REQUEST_HIDDEN_PARAMS__")
.httpRow(method = "POST")
.batch

responseDf.show(false)
```

#### Parameters

| Parameter | Description | Type |
|----------------|-------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------|
| headers | HTTP headers | Map[String,String] |
| params | HTTP params | Map[String,String] |
| hiddenParams | HTTP params which are hidden (not exposed in logs) | Map[String,String] |
| method | HTTP Method | String |
| requestHandler | Closure to handle HTTP request | (Row,Session,String,Map[String,String],String) => requests.Respons |
| session | Closure to handle HTTP sessions | () = requests.Session |
| connectTimeout | Timeout in ms to keep the connection keep-alive, it's recommended to keep this number high | Int |
| readTimeout | Maximum number of ms to perform a single HTTP request | Int |
| threadPoolSize | How many connections in parallel for each executor. parallelism = number of excutors * number of cores * threadPoolSize | Int |
| batchSize | How many records a single thread will process | Int |

| Parameters | Mandatory | Description | Column Type |
|-------------------------------|-----------|------------------------------------------------------------------------------------|--------------------|
| \_\_ID\_\_ | Yes | This field will be in response of http.almaren component, it's useful to join data | String |
| \_\_URL\_\_ | Yes | Used to perform the HTTP request | String |
| \_\_DATA\_\_ | No | Data Content, used in POST/PUT Method HTTP requests | String |
| \_\_REQUEST_HEADERS\_\_ | Yes | HTTP headers | Map[String,String] |
| \_\_REQUEST_PARAMS\_\_ | Yes | HTTP params | Map[String,String] |
| \_\_REQUEST_HIDDEN_PARAMS\_\_ | Yes | HTTP params which are hidden (not exposed in logs) | Map[String,String] |

| Parameters | Description |
|-------------------------------|------------------------------------------------------------|
| \_\_ID\_\_ | Custom ID , This field will be useful to join data |
| \_\_BODY\_\_ | HTTP response |
| \_\_HEADER\_\_ | HTTP response header |
| \_\_STATUS_CODE\_\_ | HTTP response code |
| \_\_STATUS_MSG\_\_ | HTTP response message |
| \_\_ERROR\_\_ | Java Exception |
| \_\_ELAPSED_TIME\_\_ | Request time in ms |
| \_\_URL\_\_ | HTTP request URL |
| \_\_DATA\_\_ | Data Content, used in POST/PUT Method HTTP requests |
| \_\_REQUEST_HEADERS\_\_ | HTTP Request headers |
| \_\_REQUEST_PARAMS\_\_ | HTTP Request params |
| \_\_REQUEST_HIDDEN_PARAMS\_\_ | HTTP Request params which are hidden (not exposed in logs) |


4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ lazy val scala213 = "2.13.9"
crossScalaVersions := Seq(scala212,scala213)
ThisBuild / scalaVersion := scala213

val sparkVersion = "3.3.0"
val sparkVersion = "3.3.3"
val majorVersionReg = "([0-9]+\\.[0-9]+).{0,}".r

val majorVersionReg(majorVersion) = sparkVersion
Expand All @@ -17,7 +17,7 @@ scalacOptions ++= Seq("-deprecation", "-feature")
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.9-${majorVersion}" % "provided",
"com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.10-${majorVersion}" % "provided",
"com.lihaoyi" %% "requests" % "0.7.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"org.scalatest" %% "scalatest" % "3.2.14" % "test"
Expand Down

0 comments on commit 5e6c218

Please sign in to comment.