Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.2k views
in Technique[技术] by (71.8m points)

redis - Optimization of Flink windows with small slide size and same contents?

We have developed a Flink application which receives messages having fields like id, value1, value2, ... In the Flink application, they are keyed by id, and assigned sliding windows of different size and slide of 10 seconds. We use AggregateFunction to compute some statistics for each id, and the resulted data stream is sinked to Redis. The code looks like:

DataStream<Tuple2<String, String>> statistics = messages.keyBy(0)
    .timeWindow(Time.seconds(300), Time.seconds(10))
    .aggregate(new Min5Aggregate())
    .setParallelism(20);

statistics.addSink(new CustomRedisSink()).setParallelism(20);

With this, other systems can use and display the statistics by reading Redis, and the result is updated every 10 seconds. But now we are having performance issues with this implementation. I believe one reason is that there are excess windows created for some ids which don't have very active value updates.

Say the window size is 300 secondes, and there is one id which only have new values every few hours. But each time the new values of this id arrive, there will be 30 (300s/10s) windows created, and the windows have same aggregation outputs since there are no more new values before they expire. What we are doing now is to compare the output with the values in Redis when skining the result stream, and skip the update if they are the same.

To optimize the performance, I'm wondering if there is any way in Flink to stop a window from triggering FIRE when it has identical contents with the window before it. So that the comparisons with Redis won't be necessary. Or, it would also be very helpful if you have any other optimization suggestions for this system.

*Since there are other ids which have frequent value updates, and we need to have rather up-to-date statistics, increasing the slide size would not be a choice.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

I don't believe a trigger can keep state from the previous window. A ProcessWindowFunction can keep state from earlier windows, so that's an option.

One fairly straightforward solution would be to insert a RichFlatMapFunction between the window and the sink that remembers the previous result, and only produces output if the new result is different.

For more elaborate optimizations of sliding windows you can implement the window as a KeyedProcessFunction. That way you can keep around the thirty 10-second slices as well as the fully aggregated result for 300 seconds, and then every 10 seconds all you need to do is to subtract out the oldest 10 seconds and add in the newest 10 seconds. For busy keys this should be more efficient than adding each event into 30 windows -- but doing all the bookkeeping yourself is definitely more work. The Flink docs include an example of doing this for tumbling event time windows; the extension to sliding windows is left to the reader.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...