We have developed a Flink application which receives messages having fields like id, value1, value2, ...
In the Flink application, they are keyed by id
, and assigned sliding windows of different size and slide
of 10 seconds. We use AggregateFunction to compute some statistics for each id, and the resulted data stream is sinked to Redis. The code looks like:
DataStream<Tuple2<String, String>> statistics = messages.keyBy(0)
.timeWindow(Time.seconds(300), Time.seconds(10))
.aggregate(new Min5Aggregate())
.setParallelism(20);
statistics.addSink(new CustomRedisSink()).setParallelism(20);
With this, other systems can use and display the statistics by reading Redis, and the result is updated every 10 seconds. But now we are having performance issues with this implementation. I believe one reason is that there are excess windows created for some ids which don't have very active value updates.
Say the window size is 300 secondes, and there is one id which only have new values every few hours. But each time the new values of this id arrive, there will be 30 (300s/10s) windows created, and the windows have same aggregation outputs since there are no more new values before they expire. What we are doing now is to compare the output with the values in Redis when skining the result stream, and skip the update if they are the same.
To optimize the performance, I'm wondering if there is any way in Flink to stop a window from triggering FIRE when it has identical contents with the window before it. So that the comparisons with Redis won't be necessary. Or, it would also be very helpful if you have any other optimization suggestions for this system.
*Since there are other ids which have frequent value updates, and we need to have rather up-to-date statistics, increasing the slide size would not be a choice.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…