Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
394 views
in Technique[技术] by (71.8m points)

Columns pruning not working with Spark Structured Streaming

I'm using Spark 2.4.7 and I have written a custom Spark structured streaming data source, with support for column pruning:

class MyMicroBatchReader(...) extends MicroBatchReader with SupportsPushDownRequiredColumns { ... }

The problem is that the column pruning is not working: the pruneColumns(requiredSchema) method is never called.

To provide some more details, if I run for example:

spark.readStream().format("mysource").load().select("Id").writeStream().format("console").start()

Then the output of the explain() command is:

== Parsed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4b710a86
+- Project [Id#249]
   +- Project [_timestamp#440 AS _timestamp#248, Id#441 AS Id#249, fieldA#442 AS fieldA#250, fieldB#443 AS fieldB#251, ... 37 more fields]
      +- Streaming RelationV2 mysource[_timestamp#440, Id#441, fieldA#442, fieldB#443, ... 37 more fields] (Options: [])

== Analyzed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4b710a86
+- Project [Id#249]
   +- Project [_timestamp#440 AS _timestamp#248, Id#441 AS Id#249, fieldA#442 AS fieldA#250, fieldB#443 AS fieldB#251, ... 37 more fields]
      +- Streaming RelationV2 mysource[_timestamp#440, Id#441, fieldA#442, fieldB#443, ... 37 more fields] (Options: [])

== Optimized Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4b710a86
+- Project [Id#441]
   +- Streaming RelationV2 mysource[_timestamp#440, Id#441, fieldA#442, fieldB#443, ... 37 more fields] (Options: [])

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@4b710a86
+- *(1) Project [Id#441]
   +- *(1) Project [_timestamp#440 AS _timestamp#248, Id#441 AS Id#249, fieldA#442 AS fieldA#250, fieldB#443 AS fieldB#251, ... 37 more fields]
      +- *(1) ScanV2 mysource[_timestamp#440, Id#441, fieldA#442, fieldB#443, ... 37 more fields] (Options: [])

Do you have any idea why this is not working?

Thanks


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...