I have found the solution for the above issue, so here I am listing it in steps that are required.
Steps
- We need to add some configs in the
flink-conf.yaml
file which I have listed below.
state.backend: filesystem
state.checkpoints.dir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
state.backend.fs.checkpointdir: s3://s3-bucket/checkpoints/ #"s3://<your-bucket>/<endpoint>"
s3.access-key: XXXXXXXXXXXXXXXXXXX #your-access-key
s3.secret-key: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx #your-secret-key
s3.endpoint: http://127.0.0.1:9000 #your-endpoint-hostname (I have used Minio)
After completing the first step we need to copy the respective(flink-s3-fs-hadoop-1.10.0.jar
and flink-s3-fs-presto-1.10.0.jar
) JAR files from the opt directory to the plugins directory of your Flink.
- E.g:--> 1. Copy
/flink-1.10.0/opt/flink-s3-fs-hadoop-1.10.0.jar
to /flink-1.10.0/plugins/s3-fs-hadoop/flink-s3-fs-hadoop-1.10.0.jar
// Recommended for StreamingFileSink
2. Copy /flink-1.10.0/opt/flink-s3-fs-presto-1.10.0.jar
to /flink-1.10.0/plugins/s3-fs-presto/flink-s3-fs-presto-1.10.0.jar
//Recommended for checkpointing
Add this in checkpointing code
env.setStateBackend(new FsStateBackend("s3://s3-bucket/checkpoints/"))
- After completing all the above steps re-start the Flink if it is already running.
Note:
- If you are using both(
flink-s3-fs-hadoop
and flink-s3-fs-presto
) in Flink then please use s3p://
specificly for flink-s3-fs-presto
and s3a://
for flink-s3-fs-hadoop
instead of s3://
.
- For more details click here.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…