I am trying to create an aggregate file for end users to utilize to avoid having them process multiple sources with much larger files. To do that I:
A) iterate through all source folders, stripping out 12 fields that are most commonly requested, spinning out parquet files in a new location where these results are co-located.
B) I try to go back through the files created in step A and re-aggregate them by grouping by the 12 fields to reduce it to a summary row for each unique combination.
What I'm finding is that step A reduces the payload 5:1 (roughly 250 gigs becomes 48.5 gigs). Step B however, instead of further reducing this, increase by 50% over step A. However, my counts match.
This is using Spark 1.5.2
My code, modified only to replace the field names with field1...field12 to make it more readable, is below with the results I've noted.
While I don't necessarily expect another 5:1 reduction, I don't know what I'm doing incorrectly to increase the storage side for less rows with the same schema. Anyone able to help me understand what I did wrong?
Thanks!
//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of 16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed
//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in 3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed
//The parquet schemas created (both tables match):
|-- field1: string (nullable = true) (10 characters)
|-- field2: string (nullable = true) (15 characters)
|-- field3: string (nullable = true) (50 characters max)
|-- field4: string (nullable = true) (10 characters)
|-- field5: string (nullable = true) (10 characters)
|-- field6: string (nullable = true) (10 characters)
|-- field7: string (nullable = true) (16 characters)
|-- field8: string (nullable = true) (10 characters)
|-- field9 string (nullable = true) (15 characters)
|-- field10: string (nullable = true)(20 characters)
|-- field11: string (nullable = true)(14 characters)
|-- field12: string (nullable = true)(14 characters)
|-- rCount: long (nullable = true)
|-- dt: string (nullable = true)
See Question&Answers more detail:
os