Skip to content

Commit

Permalink
adding group files and size options
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin Glinski committed Nov 5, 2018
1 parent cf4b6e3 commit 9b68168
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 5 deletions.
23 changes: 19 additions & 4 deletions aws_glue_etl_docker/glueshim.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pyspark import SparkConf, SparkContext, SQLContext
from pprint import pprint

def _load_data(filePaths, dataset_name, spark_context):
def _load_data(filePaths, dataset_name, spark_context, groupfiles, groupsize):
sqlContext = SQLContext(spark_context)
return sqlContext.read.json(filePaths)

Expand Down Expand Up @@ -51,9 +51,18 @@ def _finish(self):
from awsglue.job import Job
import boto3

def _load_data(file_paths, dataset_name, context):
def _load_data(file_paths, dataset_name, context, groupfiles, groupsize):

connection_options = {'paths': file_paths}

if groupfiles != None:
connection_options["groupFiles"] = groupfiles

if groupsize != None:
connection_options["groupSize"] = groupsize

glue0 = context.create_dynamic_frame.from_options(connection_type='s3',
connection_options={'paths': file_paths},
connection_options=connection_options,
format='json',
transformation_ctx=dataset_name)

Expand Down Expand Up @@ -145,6 +154,8 @@ def __init__(self):
c = _get_spark_context()
self.spark_context = c[0]
self.job = c[1]
self._groupfiles = None
self._groupsize = None

def arguments(self, defaults):
"""Gets the arguments for a job. When running in glue, the response is pulled form sys.argv
Expand All @@ -161,7 +172,7 @@ def load_data(self, file_paths, dataset_name):
file_paths -- list of file paths to pull from, either absolute paths or s3:// uris
dataset_name -- name of this dataset, used for glue bookmarking
"""
return _load_data(file_paths, dataset_name, self.spark_context)
return _load_data(file_paths, dataset_name, self.spark_context, self._groupfiles, self._groupsize)

def get_all_files_with_prefix(self, bucket, prefix):
"""Given a bucket and file prefix, this method will return a list of all files with that prefix
Expand Down Expand Up @@ -205,5 +216,9 @@ def finish(self):
""" Should be run at the end, will set Glue bookmarks """
_finish(self)

def set_group_files_and_size(self, groupfiles, groupsize):
""" Sets extra options used with glue https://docs.aws.amazon.com/glue/latest/dg/grouping-input-files.html """
self._groupfiles = groupfiles
self._groupsize = groupsize


2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup
setup(
name = 'aws_glue_etl_docker',
version = '0.2.0',
version = '0.4.0',
packages = ['aws_glue_etl_docker'],
license='mit',
install_requires = ['boto3'],
Expand Down

0 comments on commit 9b68168

Please sign in to comment.