TL;DR Use timestampFormat
option (not dateFormat
).
I've managed to reproduce it in the latest Spark version 2.3.0-SNAPSHOT (built from the master).
// OS shell
$ cat so-43259485.csv
"EventDate"|"Name"
"2016/12/19 00:43:27.583"|"adam"
// spark-shell
scala> spark.version
res1: String = 2.3.0-SNAPSHOT
case class Event(EventDate: java.sql.Timestamp, Name: String)
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Event].schema
scala> spark
.read
.format("csv")
.option("header", true)
.option("mode","FAILFAST")
.option("delimiter","|")
.schema(schema)
.load("so-43259485.csv")
.show(false)
17/04/08 11:03:42 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
at java.sql.Timestamp.valueOf(Timestamp.java:237)
at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTime(DateTimeUtils.scala:167)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply$mcJ$sp(UnivocityParser.scala:146)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anonfun$makeConverter$9$$anonfun$apply$17$$anonfun$apply$6.apply(UnivocityParser.scala:146)
at scala.util.Try.getOrElse(Try.scala:79)
The corresponding line in the Spark sources is the "root cause" of the issue:
Timestamp.valueOf(s)
Having read the javadoc of Timestamp.valueOf, you can learn that the argument should be:
timestamp in format yyyy-[m]m-[d]d hh:mm:ss[.f...]
. The fractional seconds may be omitted. The leading zero for mm and dd may also be omitted.
Note "The fractional seconds may be omitted" so let's cut it off by first loading the EventDate as a String and only after removing the unneeded fractional seconds convert it to Timestamp.
val eventsAsString = spark.read.format("csv")
.option("header", true)
.option("mode","FAILFAST")
.option("delimiter","|")
.load("so-43259485.csv")
It turns out that for fields of TimestampType
type Spark uses timestampFormat
option first if defined and only if not uses the code the uses Timestamp.valueOf
.
It turns out the fix is just to use timestampFormat
option (not dateFormat
!).
val df = spark.read
.format("com.databricks.spark.csv")
.option("header","true")
.option("delimiter","|")
.option("mode","FAILFAST")
.option("inferSchema","false")
.option("timestampFormat","yyyy/MM/dd HH:mm:ss.SSS")
.schema(sourceSchemaStruct)
.load(sourceFile)
scala> df.show(false)
+-----------------------+----+
|EventDate |Name|
+-----------------------+----+
|2016-12-19 00:43:27.583|adam|
+-----------------------+----+
Spark 2.1.0
Use schema inference in CSV using inferSchema
option with your custom timestampFormat
.
It's important to trigger schema inference using inferSchema
for timestampFormat
to take effect.
val events = spark.read
.format("csv")
.option("header", true)
.option("mode","FAILFAST")
.option("delimiter","|")
.option("inferSchema", true)
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")
.load("so-43259485.csv")
scala> events.show(false)
+-------------------+----+
|EventDate |Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+
scala> events.printSchema
root
|-- EventDate: timestamp (nullable = true)
|-- Name: string (nullable = true)
"Incorrect" initial version left for learning purposes
val events = eventsAsString
.withColumn("date", split($"EventDate", " ")(0))
.withColumn("date", translate($"date", "/", "-"))
.withColumn("time", split($"EventDate", " ")(1))
.withColumn("time", split($"time", "[.]")(0)) // <-- remove millis part
.withColumn("EventDate", concat($"date", lit(" "), $"time")) // <-- make EventDate right
.select($"EventDate" cast "timestamp", $"Name")
scala> events.printSchema
root
|-- EventDate: timestamp (nullable = true)
|-- Name: string (nullable = true)
events.show(false)
scala> events.show
+-------------------+----+
| EventDate|Name|
+-------------------+----+
|2016-12-19 00:43:27|adam|
+-------------------+----+
Spark 2.2.0
As of Spark 2.2 you can use to_timestamp
function to do the string to timestamp conversion.
eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
scala> eventsAsString.select($"EventDate", to_timestamp($"EventDate", "yyyy/MM/dd HH:mm:ss.SSS")).show(false)
+-----------------------+----------------------------------------------------+
|EventDate |to_timestamp(`EventDate`, 'yyyy/MM/dd HH:mm:ss.SSS')|
+-----------------------+----------------------------------------------------+
|2016/12/19 00:43:27.583|2016-12-19 00:43:27 |
+-----------------------+----------------------------------------------------+