Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
999 views
in Technique[技术] by (71.8m points)

apache spark - Why does df.limit keep changing in Pyspark?

I'm creating a data sample from some dataframe df with

rdd = df.limit(10000).rdd

This operation takes quite some time (why actually? can it not short-cut after 10000 rows?), so I assume I have a new RDD now.

However, when I now work on rdd, it is different rows every time I access it. As if it resamples over again. Caching the RDD helps a bit, but surely that's not save?

What is the reason behind it?

Update: Here is a reproduction on Spark 1.5.2

from operator import add
from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],100)
rdd1=rdd.toDF().limit(1000).rdd
for _ in range(3):
    print(rdd1.map(lambda row:row.i).reduce(add))

The output is

499500
19955500
49651500

I'm surprised that .rdd doesn't fix the data.

EDIT: To show that it get's more tricky than the re-execution issue, here is a single action which produces incorrect results on Spark 2.0.0.2.5.0

from pyspark.sql import Row
rdd=sc.parallelize([Row(i=i) for i in range(1000000)],200)
rdd1=rdd.toDF().limit(12345).rdd
rdd2=rdd1.map(lambda x:(x,x))
rdd2.join(rdd2).count()
# result is 10240 despite doing a self-join

Basically, whenever you use limit your results might be potentially wrong. I don't mean "just one of many samples", but really incorrect (since in the case the result should always be 12345).

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Because Spark is distributed, in general it's not safe to assume deterministic results. Your example is taking the "first" 10,000 rows of a DataFrame. Here, there's ambiguity (and hence non-determinism) in what "first" means. That will depend on the internals of Spark. For example, it could be the first partition that responds to the driver. That partition could change with networking, data locality, etc.

Even once you cache the data, I still wouldn't rely on getting the same data back every time, though I certainly would expect it to be more consistent than reading from disk.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...