Repository for PySpark Lab Session at ACM Winter School - 2020
Login to the IISc cluster (10.24.24.2) using SSH. Windows users can use Putty. Please use the username allocated for you in the following google sheet:
https://docs.google.com/spreadsheets/d/1CMKjWcLXCFOsa-kCa8KoplGjQDjKFRhpjoWYoKJAc5I/edit?usp=sharing
The password is acmws@iisc
Visit http://10.24.24.2:8088 in your browser. Find the following information: Total memory, Total cores, Total active nodes, Memory per node, Cores per node and Scheduler type.
Run the following HDFS commands one by one in your terminal to inspect the dataset and observe the output,
# List all the files
hdfs dfs -ls /ml/small
# Print first few lines of the files
hdfs dfs -head /ml/small/movies.csv
hdfs dfs -head /ml/small/ratings.csv
hdfs dfs -head /ml/small/tags.csv
hdfs dfs -head /ml/small/links.csv
Once logged into the cluster, run the following command:
pyspark --master yarn --deploy-mode client --conf spark.ui.port=5001 spark.yarn.archive=hdfs:///spark-libs.jar --num-executors 1 --executor-cores 2 --executor-memory 2g
and you should get an output similar to the following,
Python 2.7.5 (default, Apr 11 2018, 07:36:10)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
2020-01-13 23:19:13 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2020-01-13 23:19:15 WARN Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.0
/_/
Using Python version 2.7.5 (default, Apr 11 2018 07:36:10)
SparkSession available as 'spark'.
>>>
Enter the following statements in PySpark terminal to read and cache the datset in memory,
m = sc.textFile("hdfs:///ml/full/movies.csv").cache()
r = sc.textFile("hdfs:///ml/full/ratings.csv").cache()
t = sc.textFile("hdfs:///ml/full/tags.csv").cache()
l = sc.textFile("hdfs:///ml/full/links.csv").cache()
tu = t.map(lambda l : l.split(",")[0]).distinct()
print 'Number of Users: ', tu.count()-1
ru = r.map(lambda l : l.split(",")[0]).distinct()
print 'Number of Users: ', tu.intersection(ru).count() - 1
# Unicode reader
# Reference: https://docs.python.org/2/library/csv.html#examples
import csv
def unicode_csv_reader(unicode_csv_data, dialect=csv.excel, **kwargs):
csv_reader = csv.reader(utf_8_encoder(unicode_csv_data), dialect=dialect, **kwargs)
for row in csv_reader:
yield [unicode(cell, 'utf-8') for cell in row]
def utf_8_encoder(unicode_csv_data):
for line in unicode_csv_data:
yield line.encode('utf-8')
mp = m.map(lambda l: tuple(unicode_csv_reader([l,]))[0])
mp = mp.filter(lambda l: l[0] != u'movieId').map(lambda l: (len(l[2].split('|')), l))
mc = mp.map(lambda l: l[0]).max()
mcm = mp.lookup(mc)
print mc, mcm
mp = m.map(lambda l: tuple(unicode_csv_reader([l,]))[0])
mgs = mp.flatMap(lambda l : l[2].split("|")).map(lambda g : (g, 1)).reduceByKey(lambda a, b: a + b)
msg = mgs.map(lambda (g,s) : (s,g))
gmin = msg.min()
gmax = msg.max()
print 'Genre with min films is',gmin[1],'with count',gmin[0]
print 'Genre with max films is',gmax[1],'with count',gmax[0]
Exit the PySpark terminal by entering quit()
.