Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cbits/process_csv.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ static uint64_t find_character_in_chunk(uint8_t *in, uint8_t c) {
// Let's go ahead and assume `in` will only ever get 64 bytes
// initial_quoted will be either all_ones ~0ULL or all_zeros 0ULL
#ifdef HAS_SIMD_CSV
static uint64_t parse_chunk(uint8_t *in, uint64_t *initial_quoted) {
static uint64_t parse_chunk(uint8_t *in, uint8_t separator, uint64_t *initial_quoted) {
uint64_t quotebits = find_character_in_chunk(in, QUOTE_CHAR);
// See https://wunkolo.github.io/post/2020/05/pclmulqdq-tricks/
// Also, section 3.1.1 of Parsing Gigabytes of JSON per Second,
Expand All @@ -141,7 +141,7 @@ static uint64_t parse_chunk(uint8_t *in, uint64_t *initial_quoted) {
// at the last bit
(*initial_quoted) = (uint64_t)((int64_t)quotemask >> 63);

uint64_t commabits = find_character_in_chunk(in, COMMA_CHAR);
uint64_t commabits = find_character_in_chunk(in, separator);
uint64_t newlinebits = find_character_in_chunk(in, NEWLINE_CHAR);

uint64_t delimiter_bits = (commabits | newlinebits) & ~quotemask;
Expand Down Expand Up @@ -170,7 +170,7 @@ static size_t find_one_indices(size_t start_index, uint64_t bits, size_t *indice
}
#endif

size_t get_delimiter_indices(uint8_t *buf, size_t len, size_t *indices) {
size_t get_delimiter_indices(uint8_t *buf, size_t len, uint8_t separator, size_t *indices) {
// Recall we padded our file with 64 empty bytes.
// So if, for example, we had a file of 187 bytes
// We pad it with zeros and so we have 251 bytes
Expand All @@ -185,7 +185,7 @@ size_t get_delimiter_indices(uint8_t *buf, size_t len, size_t *indices) {
size_t base = 0;
for (size_t i = 0; i < unpaddedLen; i += 64) {
uint8_t *in = buf + i;
uint64_t delimiter_bits = parse_chunk(in, &initial_quoted);
uint64_t delimiter_bits = parse_chunk(in, separator, &initial_quoted);
find_one_indices(i, delimiter_bits, indices, &base);
}
return base;
Expand Down
4 changes: 1 addition & 3 deletions cbits/process_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
#define ALL_ONES_MASK ~0ULL
#define ALL_ZEROS_MASK 0ULL

// Only expose a portable API in the public header.
// Implementation may use platform-specific intrinsics internally.
size_t get_delimiter_indices(uint8_t *buf, size_t len, size_t* indices);
size_t get_delimiter_indices(uint8_t *buf, size_t len, uint8_t separator, size_t* indices);

#endif
7 changes: 6 additions & 1 deletion dataframe.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ tested-with: GHC ==9.4.8 || ==9.6.7 || ==9.8.4 || ==9.10.3 || ==9.12.2
extra-doc-files: CHANGELOG.md README.md
extra-source-files: cbits/process_csv.h
tests/data/*.csv
tests/data/*.tsv
tests/data/*.parquet
tests/data/unstable_csv/*.csv
tests/data/unstable_csv/*.tsv
-- tests/data/*.md
-- tests/data/*.bin
-- tests/data/*.json
Expand Down Expand Up @@ -182,12 +185,14 @@ test-suite tests
Parquet
build-depends: base >= 4 && < 5,
dataframe ^>= 0.4,
directory >= 1.3.0.0 && < 2,
HUnit ^>= 1.6,
random >= 1 && < 2,
random-shuffle >= 0.0.4 && < 1,
random >= 1 && < 2,
text >= 2.0 && < 3,
time >= 1.12 && < 2,
vector ^>= 0.13
vector ^>= 0.13,
containers >= 0.6.7 && < 0.9
hs-source-dirs: tests
default-language: Haskell2010
2 changes: 2 additions & 0 deletions src/DataFrame.hs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ import DataFrame.IO.CSV as CSV (
import DataFrame.IO.Parquet as Parquet (readParquet)
import DataFrame.IO.Unstable.CSV as UnstableCSV (
fastReadCsvUnstable,
fastReadTsvUnstable,
readCsvUnstable,
readTsvUnstable,
)
import DataFrame.Internal.Column as Column (
Column,
Expand Down
80 changes: 53 additions & 27 deletions src/DataFrame/IO/Unstable/CSV.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
module DataFrame.IO.Unstable.CSV (
fastReadCsvUnstable,
readCsvUnstable,
fastReadTsvUnstable,
readTsvUnstable,
) where

import qualified Data.Vector as Vector
Expand Down Expand Up @@ -52,27 +54,42 @@ import DataFrame.IO.CSV (
import DataFrame.Internal.DataFrame (DataFrame (..))
import DataFrame.Operations.Typing (parseFromExamples)

fastReadCsvUnstable :: FilePath -> IO DataFrame
fastReadCsvUnstable =
readCsvUnstable'
readSeparatedDefaultFast :: Word8 -> FilePath -> IO DataFrame
readSeparatedDefaultFast separator =
readSeparated
separator
defaultReadOptions
getDelimiterIndices

readCsvUnstable :: FilePath -> IO DataFrame
readCsvUnstable =
readCsvUnstable'
readSeparatedDefault :: Word8 -> FilePath -> IO DataFrame
readSeparatedDefault separator =
readSeparated
separator
defaultReadOptions
( \originalLen v -> do
( \separator originalLen v -> do
indices <- mallocArray originalLen
getDelimiterIndices_ originalLen v indices
getDelimiterIndices_ separator originalLen v indices
)

readCsvUnstable' ::
fastReadCsvUnstable :: FilePath -> IO DataFrame
fastReadCsvUnstable = readSeparatedDefaultFast comma

readCsvUnstable :: FilePath -> IO DataFrame
readCsvUnstable = readSeparatedDefault comma

fastReadTsvUnstable :: FilePath -> IO DataFrame
fastReadTsvUnstable = readSeparatedDefaultFast tab

readTsvUnstable :: FilePath -> IO DataFrame
readTsvUnstable = readSeparatedDefault tab

readSeparated ::
Word8 ->
ReadOptions ->
(Int -> VS.Vector Word8 -> IO (VS.Vector CSize)) ->
(Word8 -> Int -> VS.Vector Word8 -> IO (VS.Vector CSize)) ->
FilePath ->
IO DataFrame
readCsvUnstable' opts delimiterIndices filePath = do
readSeparated separator opts delimiterIndices filePath = do
-- We use write copy mode so that we can append
-- padding to the end of the memory space
(bufferPtr, offset, len) <-
Expand All @@ -83,7 +100,7 @@ readCsvUnstable' opts delimiterIndices filePath = do
let mutableFile = unsafeFromForeignPtr bufferPtr offset len
paddedMutableFile <- grow mutableFile 64
paddedCSVFile <- VS.unsafeFreeze paddedMutableFile
indices <- delimiterIndices len paddedCSVFile
indices <- delimiterIndices separator len paddedCSVFile
let numCol = countColumnsInFirstRow paddedCSVFile indices
totalRows = VS.length indices `div` numCol
extractField' = extractField paddedCSVFile indices
Expand Down Expand Up @@ -142,7 +159,8 @@ extractField ::
Int ->
Text
extractField file indices position =
TextEncoding.decodeUtf8Lenient
Text.strip
. TextEncoding.decodeUtf8Lenient
. unsafeToByteString
$ VS.slice
previous
Expand All @@ -163,15 +181,17 @@ foreign import capi "process_csv.h get_delimiter_indices"
get_delimiter_indices ::
Ptr CUChar -> -- input
CSize -> -- input size
CUChar -> -- separator character
Ptr CSize -> -- result array
IO CSize -- occupancy of result array

{-# INLINE getDelimiterIndices #-}
getDelimiterIndices ::
Word8 ->
Int ->
VS.Vector Word8 ->
IO (VS.Vector CSize)
getDelimiterIndices originalLen csvFile =
getDelimiterIndices separator originalLen csvFile =
VS.unsafeWith csvFile $ \buffer -> do
let paddedLen = VS.length csvFile
-- then number of delimiters cannot exceed the size
Expand All @@ -182,9 +202,10 @@ getDelimiterIndices originalLen csvFile =
get_delimiter_indices
(castPtr buffer)
(fromIntegral paddedLen)
(fromIntegral separator)
(castPtr indices)
if num_fields == -1
then getDelimiterIndices_ originalLen csvFile indices
then getDelimiterIndices_ separator originalLen csvFile indices
else do
indices' <- newForeignPtr_ indices
let resultVector = VSM.unsafeFromForeignPtr0 indices' paddedLen
Expand All @@ -202,10 +223,11 @@ getDelimiterIndices originalLen csvFile =
-- cannot be used. For example if neither ARM_NEON
-- nor AVX2 are available

lf, cr, comma, quote :: Word8
lf, cr, comma, tab, quote :: Word8
lf = 0x0A
cr = 0x0D
comma = 0x2C
tab = 0x09
quote = 0x22

-- We parse using a state machine
Expand All @@ -215,15 +237,17 @@ data State
deriving (Enum)

{-# INLINE stateTransitionTable #-}
stateTransitionTable :: UArray (Int, Word8) Int
stateTransitionTable = array ((0, 0), (1, 255)) [(i, f i) | i <- range ((0, 0), (1, 255))]
stateTransitionTable :: Word8 -> UArray (Int, Word8) Int
stateTransitionTable separator = array ((0, 0), (1, 255)) [(i, f i) | i <- range ((0, 0), (1, 255))]
where
-- Unescaped newline
f (0, 0x0A) = fromEnum UnEscaped
-- Unescaped comma
f (0, 0x2C) = fromEnum UnEscaped
-- Unescaped quote
f (0, 0x22) = fromEnum Escaped
f (0, character)
-- Unescaped newline
| character == 0x0A = fromEnum UnEscaped
-- Unescaped separator
| character == separator = fromEnum UnEscaped
-- Unescaped quote
| character == 0x22 = fromEnum Escaped
| otherwise = fromEnum UnEscaped
-- Escaped quote
-- escaped quote in fields are dealt as
-- consecutive quoted sections of a field
Expand All @@ -237,11 +261,12 @@ stateTransitionTable = array ((0, 0), (1, 255)) [(i, f i) | i <- range ((0, 0),

{-# INLINE getDelimiterIndices_ #-}
getDelimiterIndices_ ::
Word8 ->
Int ->
VS.Vector Word8 ->
Ptr CSize ->
IO (VS.Vector CSize)
getDelimiterIndices_ originalLen csvFile resultPtr = do
getDelimiterIndices_ separator originalLen csvFile resultPtr = do
resultVector <- resultVectorM
(_, resultLen) <-
VS.ifoldM'
Expand All @@ -262,6 +287,7 @@ getDelimiterIndices_ originalLen csvFile resultPtr = do
resultVectorM = do
resultForeignPtr <- newForeignPtr_ resultPtr
return $ VSM.unsafeFromForeignPtr0 resultForeignPtr paddedLen
transitionTable = stateTransitionTable separator
processCharacter ::
VSM.IOVector CSize ->
(State, Int) ->
Expand All @@ -275,7 +301,7 @@ getDelimiterIndices_ originalLen csvFile resultPtr = do
character =
case state of
UnEscaped ->
if character == lf || character == comma
if character == lf || character == separator
then do
VSM.write
resultVector
Expand All @@ -287,7 +313,7 @@ getDelimiterIndices_ originalLen csvFile resultPtr = do
where
newState =
toEnum $
stateTransitionTable
transitionTable
! (fromEnum state, character)

{-# INLINE countColumnsInFirstRow #-}
Expand Down
Loading