The number of files generated during INSERT ... SELECT
depends on the number of processes running on final reducer (final reducer vertex if you are running on Tez) plus bytes per reducer configured.
If the table is partitioned and there is no DISTRIBUTE BY
specified, then in the worst case each reducer creates files in each partition. This creates high pressure on reducers and may cause OOM exception.
To make sure reducers are writing only one partition files each, add DISTRIBUTE BY partition_column
at the end of your query.
If the data volume is too big, and you want more reducers to increase parallelism and to create more files per partition, add random number to the distribute by, for example using this: FLOOR(RAND()*100.0)%10
- it will distribute data additionally by random 10 buckets, so in each partition will be 10 files.
Finally your INSERT sentence will look like:
INSERT OVERWRITE table PARTITION(part_col)
SELECT *
FROM src
DISTRIBUTE BY part_col, FLOOR(RAND()*100.0)%10; --10 files per partition
Also this configuration setting affects the number of files generated:
set hive.exec.reducers.bytes.per.reducer=67108864;
If you have too much data, Hive will start more reducers to process no more than bytes per reducer
specified on each reducer process. The more reducers - the more files will be generated. Decreasing this setting may cause increasing the number of reducers running and they will create minimum one file per reducer. If partition column is not in the distribute by
then each reducer may create files in each partition.
To make long story short, use
DISTRIBUTE BY part_col, FLOOR(RAND()*100.0)%10 -- 10 files per partition
If you want 20 files per partition, use FLOOR(RAND()*100.0)%20; - this will guarantee minimum 20 files per partition if you have enough data, but will not guarantee the maximum size of each file.
Bytes per reducer setting does not guarantee that it will be the fixed minimum number of files. The number of files will depend of total data size/bytes.per.reducer. This setting will guarantee the maximum size of each file.
But much better use some evenly distributed key or combination with low cardinality instead of random because in case of containers restart, rand() may produce different values for the same rows and it may cause data duplication or loss(same data which is already present in some reducer output will be distributed one more time to another reducer). You can calculate similar function on some keys available instead of rand()
to get more or less evenly distributed key with low cardinality.
You can use both methods combined: bytes per reducer limit + distribute by to control both the minimum number of files and maximum file size.
Also read this answer about using distribute by
to distribute data evenly between reducers: https://stackoverflow.com/a/38475807/2700344