AbsaOSS / cobrix

A COBOL parser and Mainframe/EBCDIC data source for Apache Spark
Apache License 2.0
137 stars 78 forks source link

Issue with customrecordparser- #313

Open Kumar747 opened 4 years ago

Kumar747 commented 4 years ago

Hi,

We have a file with variable length and length is determined by first 6 bytes-(has multiple copybook so merged all of them-) I am using custom record parser for this file, below are the sample parser and main class which is used- Issue: Nothing is matching with that 6 bytes and all are throwing an error- and also tried to read the rejected records given size as 1000 as a sample but all the records are blank/nulls- In the below code I have given 5 sample patterns for the file, however we have around 22 different patterns-

Can you please give us a suggestion, whether we are on the right path and how to achieve this?

Appreciate your cooperation-

Thanks-

package com.example.spark.cobol.app

import za.co.absa.cobrix.cobol.parser.common.Constants import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordMetadata}

class CustomRecordHeadersParser extends Serializable with RecordHeaderParser {

/* RDW header is a 5 byte header / override def getHeaderLength: Int = 6

override def isHeaderDefinedInCopybook: Boolean = true

override def getRecordMetadata(header: Array[Byte], fileOffset: Long, fileSize: Long, recordNum: Long): RecordMetadata = { val rdwHeaderBlock = getHeaderLength if (header.length < rdwHeaderBlock) { RecordMetadata(-1, isValid = false) } else if(header.map( & 0xFF).mkString("") == "01KJUG"){ RecordMetadata(8034, isValid = true) } else if(header.map( & 0xFF).mkString("") == "40NJHY"){ RecordMetadata(18034, isValid = true) } else if(header.map( & 0xFF).mkString("") == "87BGHO"){ RecordMetadata(6575, isValid = true) } else if(header.map( & 0xFF).mkString("") == "09GHGT"){ RecordMetadata(2678, isValid = true) } else if(header.map(_ & 0xFF).mkString("").take(3) == "HDR"){ RecordMetadata(65, isValid = false) } else { //throw new IllegalStateException(s"Custom RDW headers is not matching with any of the patterns-") RecordMetadata(1000, isValid = true) } }

}

package com.example.spark.cobol.app

import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession

object SparkCodecApp {

def main(args: Array[String]): Unit = { // Switch logging level to WARN Logger.getLogger("org").setLevel(Level.WARN) Logger.getLogger("akka").setLevel(Level.WARN)

val spark = SparkSession
  .builder()
  .appName("Spark-Cobol Custom header parser example")
  .getOrCreate()

val df = spark
  .read
  .format("cobol")
  .option("copybook", "../example_data/copybook_codec.cob")
  .option("is_record_sequence", "true")
  .option("generate_record_id", true)
  .option("schema_retention_policy", "collapse_root")
  .option("record_header_parser", "com.example.spark.cobol.app.CustomRecordHeadersParser") // Custom record header parser class
  .load("../example_data/data_codec/somefile.dat")

df.printSchema()

df.show

}

}


yruslan commented 4 years ago

I'm not sure

header.map(_ & 0xFF).mkString("")

does what you expect. It might depend on the encoding of the header. If you have an example of binary representation of your headers, I can advice of how to decode.

For ASCII header you can use new String(...):

val header = Array[Byte](0x54.toByte, 0x55.toByte, 0x56.toByte, 0x57.toByte)

header.map(_ & 0xFF).mkString("")         // res0: String = 84858687
new String(header, "ASCII")               // res1: String = TUVW