Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
885 views
in Technique[技术] by (71.8m points)

apache spark - Why does foreach not bring anything to the driver program?

I wrote this program in spark shell

val array = sc.parallelize(List(1, 2, 3, 4))
array.foreach(x => println(x))

this prints some debug statements but not the actual numbers.

The code below works fine

for(num <- array.take(4)) {
  println(num)
}

I do understand that take is an action and therefore will cause spark to trigger the lazy computation.

But foreach should have worked the same way... why did foreach not bring anything back from spark and start doing the actual processing (get out of lazy mode)

How can I make the foreach on the rdd work?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The RDD.foreach method in Spark runs on the cluster so each worker which contains these records is running the operations in foreach. I.e. your code is running, but they are printing out on the Spark workers stdout, not in the driver/your shell session. If you look at the output (stdout) for your Spark workers, you will see these printed to the console.

You can view the stdout on the workers by going to the web gui running for each running executor. An example URL is http://workerIp:workerPort/logPage/?appId=app-20150303023103-0043&executorId=1&logType=stdout

Spark Executor Stdout

In this example Spark chooses to put all the records of the RDD in the same partition.

This makes sense if you think about it - look at the function signature for foreach - it doesn't return anything.

/**
 * Applies a function f to all elements of this RDD.
 */
def foreach(f: T => Unit): Unit

This is really the purpose of foreach in scala - its used to side effect.

When you collect records, you bring them back into the driver so logically collect/take operations are just running on a Scala collection within the Spark driver - you can see the log output as the spark driver/spark shell is whats printing to stdout in your session.

A use case of foreach may not seem immediately apparent, an example - if for each record in the RDD you wanted to do some external behaviour, like call a REST api, you could do this in the foreach, then each Spark worker would submit a call to the API server with the value. If foreach did bring back records, you could easily blow out the memory in the driver/shell process. This way you avoid these issues and can do side-effects on all the items in an RDD over the cluster.

If you want to see whats in an RDD I use;

array.collect.foreach(println) 
//Instead of collect, use take(...) or takeSample(...) if the RDD is large

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...