Skip to content

Commit

Permalink
Merge pull request #21 from sharath-sg2706/spark-3.3
Browse files Browse the repository at this point in the history
Updating master to spark-3.3
  • Loading branch information
badrinathpatchikolla authored Oct 18, 2022
2 parents 760c9a3 + 0ad1d84 commit fae381f
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 61 deletions.
1 change: 0 additions & 1 deletion .bsp/sbt.json

This file was deleted.

49 changes: 49 additions & 0 deletions .github/workflows/http.almaren-githubactions.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: HTTP.Almaren
on: [push, pull_request]

jobs:
Build:
runs-on: ubuntu-20.04
services:
postgres:
image: postgres:13.4
env:
POSTGRES_PASSWORD: postgres
POSTGRES_HOST_AUTH_METHOD: trust
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name : Check out repository code
uses: actions/checkout@v2
- name: Setup JDK
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
cache: sbt
- name: Setup web environment
run: |
curl -L http://cpanmin.us | perl - --sudo App::cpanminus
cpanm --local-lib=~/perl5 local::lib && eval $(perl -I ~/perl5/lib/perl5/ -Mlocal::lib)
cpanm Mojolicious
cpanm JSON::Parse
perl src/test/resources/script/mock_api.pl daemon -m dev -l http://\*:3000 &
- name: Build and test scala version
run: |
PGPASSWORD="postgres" psql -c 'create database almaren;' -U postgres -h localhost
PGPASSWORD="postgres" psql -c "ALTER USER postgres PASSWORD 'foo' ;" -U postgres -h localhost
PGPASSWORD="postgres" psql -c 'create role runner;' -U postgres -h localhost
PGPASSWORD="postgres" psql -c 'ALTER ROLE "runner" WITH LOGIN SUPERUSER INHERIT CREATEDB CREATEROLE REPLICATION;' -U postgres -h localhost
sbt ++2.12.15 test
sbt ++2.13.9 test
rm -rf "$HOME/.ivy2/local" || true
find $HOME/Library/Caches/Coursier/v1 -name "ivydata-*.properties" -delete || true
find $HOME/.ivy2/cache -name "ivydata-*.properties" -delete || true
find $HOME/.cache/coursier/v1 -name "ivydata-*.properties" -delete || true
find $HOME/.sbt -name "*.lock" -delete || true
killall -9 perl
37 changes: 0 additions & 37 deletions .travis.yml

This file was deleted.

40 changes: 33 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
# HTTP Connector

[![Build Status](https://travis-ci.com/modakanalytics/http.almaren.svg?token=TEB3zRDqVUuChez9334q&branch=master)](https://travis-ci.com/modakanalytics/http.almaren)
[![Build Status](https://github.com/music-of-the-ainur/http.almaren/actions/workflows/http.almaren-githubactions.yml/badge.svg)](https://github.com/music-of-the-ainur/http.almaren/actions/workflows/http.almaren-githubactions.yml)

To add http.almaren dependency to your sbt build:
```
libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.4-$SPARK_VERSION"
libraryDependencies += "com.github.music-of-the-ainur" %% "http-almaren" % "1.2.5-3.3"
```

To run in spark-shell:
For scala-version(2.12):
```
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.3-$SPARK_VERSION,com.github.music-of-the-ainur:http-almaren_2.12:1.2.4-$SPARK_VERSION"
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.12:0.9.8-3.3,com.github.music-of-the-ainur:http-almaren_2.12:1.2.5-3.3"
```
For scala-version(2.13):
```
spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almaren-framework_2.13:0.9.8-3.3,com.github.music-of-the-ainur:http-almaren_2.13:1.2.5-3.3"
```
## Table of Contents

## Table of Contents
- [Maven / Ivy Package Usage](#maven--ivy-package-usage)
- [Methods](#methods)
* [HTTP](#http)
+ [Example](#example)
Expand All @@ -31,6 +40,23 @@ spark-shell --master "local[*]" --packages "com.github.music-of-the-ainur:almare
- [Batch Delimiter](#batch-delimiter)
- [Examples](#examples)


#### Maven / Ivy Package Usage
The connector is also available from the
[Maven Central](https://mvnrepository.com/artifact/com.github.music-of-the-ainur)
repository. It can be used using the `--packages` option or the
`spark.jars.packages` configuration property. Use the following value

| version | Connector Artifact |
|----------------------------|-------------------------------------------------------------|
| Spark 3.3.x and scala 2.13 | `com.github.music-of-the-ainur:http-almaren_2.13:1.2.5-3.3` |
| Spark 3.3.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.5-3.3` |
| Spark 3.2.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.5-3.2` |
| Spark 3.1.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.5-3.1` |
| Spark 2.4.x and scala 2.12 | `com.github.music-of-the-ainur:http-almaren_2.12:1.2.5-2.4` |
| Spark 2.4.x and scala 2.11 | `com.github.music-of-the-ainur:http-almaren_2.11:1.2.5-2.4` |


## Methods

### HTTP
Expand Down Expand Up @@ -81,7 +107,7 @@ val df = almaren.builder
to_json(named_struct('data',named_struct('name',firstName + " " + lastName))) as __DATA__
FROM DATA""")
.http(method = "POST", threadPoolSize = 10, batchSize = 10000)
.deserializer("JSON","__BODY__",httpOutpustSchema)
.deserializer("JSON","__BODY__",httpOutpustSchema).alias("TABLE")
.sql("""SELECT
T.origin,
D.firstName,
Expand All @@ -90,7 +116,7 @@ val df = almaren.builder
T.url,
T.__ERROR__ as error,
T.__ELAPSED_TIME__ as request_time
FROM __TABLE__ T JOIN DATA D ON d.id = t.__ID__""")
FROM TABLE T JOIN DATA D ON d.id = t.__ID__""")
.batch

df.show(false)
Expand All @@ -115,7 +141,7 @@ Output:
T.url,
T.__ERROR__ as error,
T.__ELAPSED_TIME__ as request_time
FROM __TABLE__ T JOIN DATA D ON d.id = t.__ID__}
FROM TABLE T JOIN DATA D ON d.id = t.__ID__}
+-----------+---------+--------+---+-----------+---------------------------------------------+-----+------------+
|origin |firstName|lastName|age|status_code|url |error|request_time|
Expand Down
14 changes: 8 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ ThisBuild / name := "http.almaren"
ThisBuild / organization := "com.github.music-of-the-ainur"

lazy val scala212 = "2.12.15"
lazy val scala213 = "2.13.9"

ThisBuild / scalaVersion := scala212
crossScalaVersions := Seq(scala212,scala213)
ThisBuild / scalaVersion := scala213

val sparkVersion = "3.1.3"
val sparkVersion = "3.3.0"
val majorVersionReg = "([0-9]+\\.[0-9]+).{0,}".r

val majorVersionReg(majorVersion) = sparkVersion
Expand All @@ -15,10 +17,10 @@ scalacOptions ++= Seq("-deprecation", "-feature")
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.3-${majorVersion}" % "provided",
"com.lihaoyi" %% "requests" % "0.7.0",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
"org.scalatest" %% "scalatest" % "3.0.5" % "test"
"com.github.music-of-the-ainur" %% "almaren-framework" % s"0.9.8-${majorVersion}" % "provided",
"com.lihaoyi" %% "requests" % "0.7.1",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
"org.scalatest" %% "scalatest" % "3.2.14" % "test"
)

enablePlugins(GitVersioning)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import com.github.music.of.the.ainur.almaren.Almaren
import com.github.music.of.the.ainur.almaren.builder.Core.Implicit
import com.github.music.of.the.ainur.almaren.http.HTTPConn.HTTPImplicit
import org.apache.spark.sql.Row
import org.scalatest.funsuite.AnyFunSuite

class Test extends FunSuite with BeforeAndAfter {
class Test extends AnyFunSuite with BeforeAndAfter {
val almaren = Almaren("http-almaren")

val spark: SparkSession = almaren.spark
Expand Down Expand Up @@ -73,8 +74,8 @@ class Test extends FunSuite with BeforeAndAfter {
}

tempDf
.deserializer("JSON", "__BODY__", Some(schema))
.sql("select __ID__,data,__STATUS_CODE__ as status_code,__ELAPSED_TIME__ as elapsed_time from __TABLE__")
.deserializer("JSON", "__BODY__", Some(schema)).alias("TABLE")
.sql("select __ID__,data,__STATUS_CODE__ as status_code,__ELAPSED_TIME__ as elapsed_time from TABLE").alias("TABLE1")
.dsl(
"""__ID__$__ID__:StringType
|elapsed_time$elapsed_time:LongType
Expand All @@ -83,15 +84,15 @@ class Test extends FunSuite with BeforeAndAfter {
|data.age$age:LongType
|data.salary$salary:DoubleType
|status_code$status_code:IntegerType""".stripMargin
)
).alias("TABLE2")
.sql(
"""select T.__ID__ as id ,
full_name ,
country
age,
salary,
status_code
from __TABLE__ T join PERSON_DATA P on T.__ID__ = P.__ID__""")
from TABLE2 T join PERSON_DATA P on T.__ID__ = P.__ID__""")
.batch
}

Expand All @@ -105,17 +106,17 @@ class Test extends FunSuite with BeforeAndAfter {
method = "POST",
batchSize = 3,
batchDelimiter = (rows: Seq[Row]) => s"""[${rows.map(row => row.getAs[String](Alias.DataCol)).mkString(",")}]""")
.deserializer("JSON", "__BODY__", Some("`data` ARRAY<STRUCT<`country`: STRING, `first_name`: STRING, `last_name`: STRING>>"))
.sql("select explode(arrays_zip(__ID__, data)) as vars , __STATUS_CODE__ as status_code,__ELAPSED_TIME__ as elapsed_time from __TABLE__")
.sql("select vars.__ID__ as __ID__ ,vars.data as data ,status_code, elapsed_time from __TABLE__ ")
.deserializer("JSON", "__BODY__", Some("`data` ARRAY<STRUCT<`country`: STRING, `first_name`: STRING, `last_name`: STRING>>")).alias("TABLE")
.sql("select explode(arrays_zip(__ID__, data)) as vars , __STATUS_CODE__ as status_code,__ELAPSED_TIME__ as elapsed_time from TABLE").alias("TABLE1")
.sql("select vars.__ID__ as __ID__ ,vars.data as data ,status_code, elapsed_time from TABLE1 ").alias("TABLE2")
.dsl(
"""__ID__$__ID__:StringType
|data.country$country:StringType
|data.first_name$first_name:StringType
|data.last_name$last_name:StringType
|status_code$status_code:IntegerType
|elapsed_time$elapsed_time:LongType""".stripMargin)
.sql("select __TABLE__.__ID__ as id ,first_name,last_name,country,status_code from __TABLE__ inner join BATCH_DATA on __TABLE__.__ID__ = BATCH_DATA.__ID__ ")
|elapsed_time$elapsed_time:LongType""".stripMargin).alias("TABLE3")
.sql("select TABLE3.__ID__ as id ,first_name,last_name,country,status_code from TABLE3 inner join BATCH_DATA on TABLE3.__ID__ = BATCH_DATA.__ID__ ")
.batch

val getBatchDf = spark.read.parquet("src/test/resources/data/httpBatch.parquet")
Expand Down

0 comments on commit fae381f

Please sign in to comment.