Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.4k views
in Technique[技术] by (71.8m points)

apache beam - GroupIntoBatches for non-KV elements

According to the Apache Beam 2.0.0 SDK Documentation GroupIntoBatches works only with KV collections.

My dataset contains only values and there's no need for introducing keys. However, to make use of GroupIntoBatches I had to implement “fake” keys with an empty string as a key:

static class FakeKVFn extends DoFn<String, KV<String, String>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(KV.of("", c.element()));
  }
}

So the overall pipeline looks like the following:

public static void main(String[] args) {
  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  long batchSize = 100L;

  p.apply("ReadLines", TextIO.read().from("./input.txt"))
      .apply("FakeKV", ParDo.of(new FakeKVFn()))
      .apply(GroupIntoBatches.<String, String>ofSize(batchSize))
      .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
      .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          c.output(callWebService(c.element().getValue()));
        }
      }))
      .apply("WriteResults", TextIO.write().to("./output/"));

  p.run().waitUntilFinish();
}

Is there any way to group into batches without introducing “fake” keys?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

It is required to provide KV inputs to GroupIntoBatches because the transform is implemented using state and timers, which are per key-and-window.

For each key+window pair, state and timers necessarily execute serially (or observably so). You have to manually express the available parallelism by providing keys (and windows, though no runner that I know of parallelizes over windows today). The two most common approaches are:

  1. Use some natural key like a user ID
  2. Choose some fixed number of shards and key randomly. This can be harder to tune. You have to have enough shards to get enough parallelism, but each shard needs to include enough data that GroupIntoBatches is actually useful.

Adding one dummy key to all elements as in your snippet will cause the transform to not execute in parallel at all. This is similar to the discussion at Stateful indexing causes ParDo to be run single-threaded on Dataflow Runner.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

2.1m questions

2.1m answers

60 comments

57.0k users

...