-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathstream.py
61 lines (51 loc) · 2.21 KB
/
stream.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
import datetime
import json
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
aws_region = 'us-east-1'
kinesis_stream = 'stream_name'
kinesis_endpoint = 'https://kinesis.us-east-1.amazonaws.com/'
kinesis_app_name = 'app_name'
kinesis_initial_position = InitialPositionInStream.LATEST
kinesis_checkpoint_interval = 5
spark_batch_interval = 5
if __name__ == "__main__":
spark_context = SparkContext(appName=kinesis_app_name)
spark_streaming_context = StreamingContext(spark_context, spark_batch_interval)
sql_context = SQLContext(spark_context)
kinesis_stream = KinesisUtils.createStream(
spark_streaming_context, kinesis_app_name, kinesis_stream, kinesis_endpoint,
aws_region, kinesis_initial_position, kinesis_checkpoint_interval)
kinesis_stream.pprint()
py_rdd = kinesis_stream.map(lambda x: json.loads(x))
def process(time, rdd):
print("========= %s =========" % str(time))
try:
sqlContext = getSqlContextInstance(rdd.context)
schema = StructType([
StructField('user_id', IntegerType(), True),
StructField('username', StringType(), True),
StructField('first_name', StringType(), True),
StructField('surname', StringType(), True),
StructField('age', IntegerType(), True),
])
df = sqlContext.createDataFrame(rdd, schema)
df.registerTempTable("activity_log")
df.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshiftURL.com:5439/database?user=USERNAME&password=PASSWORD") \
.option("dbtable", "activity_log") \
.option("tempdir", "s3n://spark-temp-data/") \
.mode("append") \
.save()
except Exception as e:
print(e)
pass
py_rdd.foreachRDD(process)
spark_streaming_context.start()
spark_streaming_context.awaitTermination()
spark_streaming_context.stop()