@@ -332,14 +332,16 @@ public void testWriteAndReadTemporalAndStructColumns() throws IOException {
332332 .selectExpr (
333333 "cast(id as int) as id" ,
334334 "CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END AS event_date" ,
335- "CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP) "
336- + "ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END AS event_ts" ,
337- "named_struct("
338- + "'event_date', CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END, "
339- + "'event_ts', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP) "
340- + "ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END, "
341- + "'label', CASE WHEN id = 0 THEN 'alpha' ELSE 'beta' END"
342- + ") AS payload" );
335+ """
336+ CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP)
337+ ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END AS event_ts""" ,
338+ """
339+ named_struct(
340+ 'event_date', CASE WHEN id = 0 THEN CAST('2024-01-02' AS DATE) ELSE CAST('2024-02-03' AS DATE) END,
341+ 'event_ts', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP)
342+ ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP) END,
343+ 'label', CASE WHEN id = 0 THEN 'alpha' ELSE 'beta' END
344+ ) AS payload""" );
343345
344346 Path outputPath = tempDir .resolve ("temporal_struct_output" );
345347 originalDf
@@ -371,15 +373,13 @@ public void testWriteAndReadTemporalAndStructColumns() throws IOException {
371373 @ Test
372374 @ DisplayName ("Write TimestampNTZ columns and nested structs" )
373375 public void testWriteTimestampNtzColumns () throws IOException {
374- Dataset <Row > timestampNtzDf = spark .range (0 , 2 )
375- .selectExpr (
376- "cast(id as int) as id" ,
377- "CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ) "
378- + "ELSE CAST(NULL AS TIMESTAMP_NTZ) END AS event_ntz" ,
379- "named_struct("
380- + "'event_ntz', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ) "
381- + "ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP_NTZ) END"
382- + ") AS payload" );
376+ Dataset <Row > timestampNtzDf = spark .range (0 , 2 ).selectExpr ("cast(id as int) as id" , """
377+ CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ)
378+ ELSE CAST(NULL AS TIMESTAMP_NTZ) END AS event_ntz""" , """
379+ named_struct(
380+ 'event_ntz', CASE WHEN id = 0 THEN CAST('2024-01-02 03:04:05.123456' AS TIMESTAMP_NTZ)
381+ ELSE CAST('2024-02-03 04:05:06.654321' AS TIMESTAMP_NTZ) END
382+ ) AS payload""" );
383383
384384 Path outputPath = tempDir .resolve ("timestamp_ntz_output" );
385385 assertDoesNotThrow (() -> timestampNtzDf
@@ -406,18 +406,15 @@ private Dataset<Row> createTestDataFrame(int numRows) {
406406 }
407407
408408 private List <String > projectTemporalAndStructRows (Dataset <Row > df ) {
409- return df
410- .orderBy ("id" )
411- .selectExpr ("to_json(named_struct("
412- + "'id', id, "
413- + "'event_date', cast(event_date as string), "
414- + "'event_ts', date_format(event_ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), "
415- + "'payload_event_date', cast(payload.event_date as string), "
416- + "'payload_event_ts', date_format(payload.event_ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'), "
417- + "'payload_label', payload.label"
418- + ")) as json" )
419- .collectAsList ()
420- .stream ()
409+ return df .orderBy ("id" ).selectExpr ("""
410+ to_json(named_struct(
411+ 'id', id,
412+ 'event_date', cast(event_date as string),
413+ 'event_ts', date_format(event_ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'),
414+ 'payload_event_date', cast(payload.event_date as string),
415+ 'payload_event_ts', date_format(payload.event_ts, 'yyyy-MM-dd HH:mm:ss.SSSSSS'),
416+ 'payload_label', payload.label
417+ )) as json""" ).collectAsList ().stream ()
421418 .map (row -> row .getString (0 ))
422419 .collect (Collectors .toList ());
423420 }
0 commit comments