-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark_2.py
25 lines (20 loc) · 891 Bytes
/
spark_2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from pyspark import SparkContext
import os
def main():
sc = SparkContext(appName='SparkWordCount')
path = os.listdir('/Akamai_scratch/_team_joe_/spark_2')
print(path)
temp_file = sc.textFile(os.path.join(os.getcwd(), path[1])).flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
indir = '/Akamai_scratch/arxiv/outdir3'
for root, dirs, filenames in os.walk(indir):
for f in filenames:
try:
input_file = sc.textFile(os.path.join(root, f))
counts = input_file.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
temp_file.union(counts)
except:
pass
counts.saveAsTextFile('/Akamai_scratch/_team_joe_/spark_2/all.wordcount.csv')
sc.stop()
if __name__ == '__main__':
main()