Open pmetras opened 3 years ago
I need an example program in order to fix it.
Can someone remove the (Not reproduced) tag as I was able to craft and upload a sample code to reproduce the problem, in the previous comment?
import std/memfiles, std/streams, std/parsecsv, std/threadpool, std/cpuinfo
import std/parsecsv, std/os, std/streams, std/memfiles
{. experimental: "parallel" .}
#--------------------------------------------------------------------------------------
# Better to be in lib/pure/memfiles.nim.
type
MemoryStream* = ref MemoryStreamObj
## A stream on a memory area
MemoryStreamObj* = object of Stream
mem: pointer
size: int
mode: FileMode
pos: ByteAddress
proc msClose(s: Stream) =
MemoryStream(s).pos = -1
proc msAtEnd(s: Stream): bool =
(MemoryStream(s).pos >= MemoryStream(s).size) or
(MemoryStream(s).pos < 0)
proc msSetPosition(s: Stream, pos: int) =
if pos > MemoryStream(s).size or pos < 0:
raise newException(IOError, "Cannot set pos in stream; pos must be with memory area bounds.")
MemoryStream(s).pos = pos
proc msGetPosition(s: Stream): int =
MemoryStream(s).pos
proc msPeekData(s: Stream, buffer: pointer, bufLen: int): int =
let startAddress = cast[ByteAddress](MemoryStream(s).mem)
let p = cast[ByteAddress](MemoryStream(s).pos)
let l = min(bufLen, MemoryStream(s).size - p)
moveMem(buffer, cast[pointer](startAddress + p), l)
result = l
proc msReadData(s: Stream, buffer: pointer, bufLen: int): int =
result = msPeekData(s, buffer, bufLen)
inc(MemoryStream(s).pos, result)
proc msWriteData(s: Stream, buffer: pointer, bufLen: int) =
if MemoryStream(s).mode == fmRead:
raise newException(IOError, "Cannot write to read-only stream.")
let size = MemoryStream(s).size
if MemoryStream(s).pos + bufLen > size:
raise newException(IOError, "Cannot write to stream over memory area boundary.")
let p = cast[ByteAddress](MemoryStream(s).mem) +
cast[ByteAddress](MemoryStream(s).pos)
moveMem(cast[pointer](p), buffer, bufLen)
inc(MemoryStream(s).pos, bufLen)
proc newMemoryStream*(mem: pointer; size: int; mode: FileMode = fmRead): MemoryStream =
## Creates a new stream from a memory area. The slice size is mandatory.
new(result)
result.mem = mem
assert size > 0, "The memory area size must be positive."
result.size = size
result.mode = mode
result.closeImpl = msClose
result.atEndImpl = msAtEnd
result.setPositionImpl = msSetPosition
result.getPositionImpl = msGetPosition
result.readDataImpl = msReadData
result.peekDataImpl = msPeekData
result.writeDataImpl = msWriteData
proc newMemoryStream*(slice: MemSlice; mode: FileMode = fmRead): MemoryStream =
## Creates a new stream from a memory slice.
result = newMemoryStream(slice.data, slice.size, mode)
#--------------------------------------------------------------------------------------
type
# The initial CSV file is split in N equal-size blocks, where N is the number of cores
# of the CPU. Then we collect statistics on each block in a `BlockCounter`.
BlockCounter = object
nbQuotes: int
evenNewLine: bool
evenNewLinePos: int
nbEvenNewLines: int
oddNewLine: bool
oddNewLinePos: int
nbOddNewLines: int
# We adjust the initial blocks to fit with record lines. Each `AdjustedBlock` contains
# a start and end position relative to the initial file, and an estimate of the number
# of records in it (obtained by counting the end of lines).
AdjustedBlock = object
startPos: int
endPos: int
nbNewLines: int
# The CSV content
CsvTable = object
headers*: seq[string]
rows*: seq[CsvRow]
CsvTableRef = ref CsvTable
func calcStats(data: ptr UncheckedArray[char]; chunkOffset: int; chunkSize: int; quote: char): BlockCounter =
## Calculate statistics on a block of memory file.
## 1) the number of quotes within the block;
## 2) the position of the first newline after an even number of quotes within the block;
## 3) the position of the first newline after an odd number of quotes within the block.
## We also count the number of odd and even new lines encountered. This is used to get an
## estimate of the number of records in the CSV file to limit memory reallocations.
for index in chunkOffset ..< chunkOffset + chunkSize:
let c = data[index]
if c == quote:
inc(result.nbQuotes)
elif c == '\n':
if result.nbQuotes mod 2 == 0:
inc(result.nbEvenNewLines)
if not result.evenNewLine:
result.evenNewLinePos = index
result.evenNewLine = true
else:
inc(result.nbOddNewLines)
if not result.oddNewLine:
result.oddNewLinePos = index
result.oddNewLine = true
proc find_adjusted_blocks(memf: MemFile; quote: char): seq[AdjustedBlock] =
## Each worker scans the assigned block and collects three statistics:
## 1) the number of quotes within the block;
## 2) the position of the first newline after an even number of quotes within the block;
## 3) the position of the first newline after an odd number of quotes within the block.
let maxThreads = if countProcessors() == 0: 2 else: countProcessors()
var blockCounters = newSeq[BlockCounter](maxThreads)
# Load and map to memory a block of CSV data for end of records analysis
let data = cast[ptr UncheckedArray[char]](memf.mem)
let baseChunkSize = memf.size div maxThreads
let remainder = memf.size mod maxThreads
parallel:
for i in 0 ..< blockCounters.len:
var chunkOffset: int
var chunkSize: int
# Split the memory block per thread
if i < remainder:
chunkOffset = (baseChunkSize + 1) * i
chunkSize = baseChunkSize + 1
else:
chunkOffset = baseChunkSize * i + remainder
chunkSize = baseChunkSize
blockCounters[i] = spawn calcStats(data, chunkOffset, chunkSize, quote)
sync()
#echo "Merged blockCounters=" & $blockCounters
# Now we have counted the statistics in the blocks. We iterate over the statistics
# of all blocks and computes the starting positions of all adjusted blocks.
# We can have less adjusted blocks that the initial number of blocks, for instance
# if the CSV record size is larger than a block size.
var
adjustedBlocks: seq[AdjustedBlock] = @[]
nbQuotes = blockCounters[0].nbQuotes
nbNewLines = 0
startPos = 0
endPos = memf.size - 1
adjustedBlock: AdjustedBlock
for i in 1 ..< blockCounters.len:
inc(nbQuotes, blockCounters[i].nbQuotes)
if ((blockCounters[i].evenNewLine or blockCounters[i].oddNewLine) and i > 1) or
# The block contains a '\n', so at least one CSV record.
(i == 1 and (blockCounters[i].nbEvenNewLines > 2 or blockCounters[i].nbOddNewLines > 2)):
# The first block must contain at least 2 lines: 1 header + 1 record
if nbQuotes mod 2 == 0:
endPos = blockCounters[i].evenNewLinePos
inc(nbNewLines, blockCounters[i - 1].nbEvenNewLines)
else:
endPos = blockCounters[i].oddNewLinePos
inc(nbNewLines, blockCounters[i - 1].nbOddNewLines)
adjustedBlock.startPos = startPos
adjustedBlock.endPos = endPos # We must include '\n'
adjustedBlock.nbNewLines = nbNewLines
adjustedBlocks.add(adjustedBlock)
startPos = endPos + 1
nbNewLines = 0
# We must add the last block, which is usually smaller than the preceding ones.
if endPos < memf.size - 1:
adjustedBlock.startPos = startPos
adjustedBlock.nbNewLines = if nbQuotes mod 2 == 0: nbNewLines + blockCounters[blockCounters.len - 1].nbEvenNewLines
else: nbNewLines + blockCounters[blockCounters.len - 1].nbOddNewLines
adjustedBlock.endPos = memf.size - 1
adjustedBlocks.add(adjustedBlock)
# There is only 1 block
elif blockCounters.len == 1:
adjustedBlock.startPos = startPos
adjustedBlock.endPos = endPos
adjustedBlock.nbNewLines = nbNewLines
adjustedBlocks.add(adjustedBlock)
#echo "adjustedBlocks=" & $adjustedBlocks
result = adjustedBlocks
proc parseCsvChunk(threadId: int; nbNewLines: int; stream: Stream; csvPath: string; readHeader: bool = false; separator: char = ','; quote: char = '"'): CsvTableRef =
## Parse a CSV from a `MemoryStream` while expecting `nbNewLines`.
var csv: CsvParser
csv.open(stream, csvPath, separator = separator, quote = quote, skipInitialSpace = true)
result = new CsvTableRef
if readHeader:
csv.readHeaderRow
result.headers = csv.headers
result.rows = newSeqOfCap[CsvRow](nbNewLines)
while csv.readRow:
result.rows.add csv.row
proc read_csv_para*(csvPath: string, skip_header = false, separator = ',', quote = '"'): CsvTable =
## Load a csv into a `CsvParser` using all available CPU cores.
##
## If there is a header row, it can be skipped.
##
## Input:
## - csvPath: a path to the csvfile
## - skip_header: should read_csv_para skip the first row
## - separator: a char, default ','
## - quote: a char, default '"' (single and double quotes must be escaped).
## Separators inside quoted strings are ignored, for example: `"foo", "bar, baz"` corresponds to 2 columns not 3.
var memf = memfiles.open(csvPath)
let adjustedBlocks = find_adjusted_blocks(memf, quote)
var data: seq[FlowVar[CsvTableRef]] = newSeq[FlowVar[CsvTableRef]](adjustedBlocks.len)
parallel:
for i in 0 ..< adjustedBlocks.len:
let mem = cast[pointer](cast[ByteAddress](memf.mem) + cast[ByteAddress](adjustedBlocks[i].startPos))
let size = adjustedBlocks[i].endPos - adjustedBlocks[i].startPos
let stream = newMemoryStream(mem, size, mode = fmRead)
let nbNewLines = adjustedBlocks[i].nbNewLines
let readHeader = i == 0 and not skip_header
#let ms = MemSlice(data: mem, size: size)
#echo "Parse #" & $i & " from " & $cast[int](mem) & " for " & $size & ": " & ($ms)[0 .. 30] & "..." & ($ms)[^30 .. ^1]
data[i] = spawn parseCsvChunk(i, nbNewLines, stream, csvPath, readHeader, separator, quote)
sync()
# We can close the memory file now
memf.close()
# Collect and merge all results
var nbNewLines = 0
for i in 0 ..< adjustedBlocks.len:
inc(nbNewLines, adjustedBlocks[i].nbNewLines)
result.rows = newSeqOfCap[CsvRow](nbNewLines)
for i in 0 ..< data.len:
let csv = ^data[i]
#echo "#" & $i & " headers=" & $csv.headers & " rows=" & $csv.rows
if i == 0:
result.headers = csv.headers
result.rows.add csv.rows
type
Options = object
parserType: string
csvFile: string
proc paraParser(options: Options) =
## Distribute parsing to all CPU cores.
let csv = read_csv_para(options.csvFile)
echo "Read " & $csv.rows.len & " rows"
echo "Headers=" & $csv.headers
if false:
echo "First line=" & $csv.rows[0]
echo "Last line=" & $csv.rows[csv.rows.len - 1]
else:
for l in items(csv.rows):
echo $l
proc main* =
let options = Options(parserType: "para", csvFile: paramStr(1))
paraParser(options)
main()
The Nim compiler crashes SIGSEGV on parallel program when using
--gc:arc
.Example
I can't build a simple example from my code for the momemt.
Current Output
Expected Output
Successful compilation like with default GC.
Additional Information
Same behaviour with
--gc:orc
...