rkwahile / public

0 stars 0 forks source link

Spark / Map Reduce #5

Open rkwahile opened 6 years ago

rkwahile commented 6 years ago

WEBHDFS---------- https://community.hortonworks.com/questions/139351/how-to-upload-a-file-to-hdfs-using-webhdfs-rest-ap.html

https://data-flair.training/blogs/apache-spark-interview-questions-and-answers/

Scala Code----------------------- val rowRDD = df.map(_.split(",")).map(p => Row(p(0),p(1),p(2),p(3),p(4),p(5),p(6),p(7),p(8),p(9),p(10),p(11),p(12),p(13))) val MainDF = sqlContext.createDataFrame(rowRDD,schemaDF) val FilterDF1 = MainDF.filter(MainDF("srt_criteria") === "VPLS" || MainDF("srt_criteria") === "MPLS" || MainDF("srt_criteria") === "Lmda") val WindowDF = FilterDF1.select(col("op_date"),col("fac_id"),col("pkg_id_cd"),col("srt_criteria"),col("usr_id"),col("app_mode_cd"),col("usr_id"),col("srt_id"),col("hub_cd"),col("dist_nm"),col("dvc_id"),row_number().over(Window.partitionBy(col("op_date"),col("fac_id"),col("pkg_id_cd")).orderBy(col("event_tmstp").desc)).alias("Desc_event_tmstp")) val FilterDF2 = WindowDF.filter($"Desc_event_tmstp" === 1) val FilterDF4 = FilterDF2.filter(FilterDF2("srt_criteria") === "MPLS") val CountDistinctDF = FilterDF4.groupBy("usr_id","app_mode_cd","srt_id","op_date").agg(countDistinct($"pkg_id_cd")).as("count_pkg_id_cd")

Rank------- val TestExample3 = DataFrame.groupBy($"usr_id",$"srt_nm",$"app_mode_cd",$"srt_id",$"op_date",$"event_tmstp",$"pkg_id_cd",$"sort_action").agg(max($"event_tmstp")).where($"sort_action" === "TEMP").withColumn("count_pkg_id",row_number().over(Window.partitionBy($"event_tmstp").orderBy($"pkg_id_cd")))

-----Spark Latest----- Doc Q's : HIVE, HIVE UDF for avg soln( SUM AND COUNT )FOR AVG,

Spark RDD, lazy eval..

static block, oops hashmap https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Column.html --------Spark Latest-------- Difference between application master and application manager. Difference between fair and capaciy scheduler. Difference between RDD, dataframe and Dataset. Catalyst Analyzer. calculate max of salary using RDD. Calculate square of list. Wordcount in spark. Difference between reduceByKey, aggregateByKey, and groupByKey. Difference between repartition and coalesce. How transformation are lazy in spark difference between ORC and Parquet. Shared Variables (broadcast and accumulator) internal Job execution flow of spark.

--------Predicate Push down-------- http://bigdatums.net/2017/08/29/what-is-predicate-pushdown/ Filtering data before its loaded in memory of trf'd to network.. ORC maintains stats like min/max of columns in chunks.. ----Dzone Spark---- https://dzone.com/refcardz/apache-spark

----------PYSPARK------------ https://stackoverflow.com/questions/48554619/count-number-of-duplicate-rows-in-sparksql

---------Data IKU Add all findings to doc...

  1. 502 gateway.. 1,2,3 points of email.

Check basic libraries. numpy
Try and implement bharath usecase in dataiku..

numpy,pandas, theanos,

----Data formats---- https://techmagie.wordpress.com/category/big-data/data-formats/ ----CG EBCDIC "Copy Book", AsCII

https://dzone.com/articles/some-lessons-of-spark-and-memory-issues-on-emr

https://dzone.com/articles/how-cloudera-uses-open-source ------------Certification------------ https://hortonworks.com/services/training/certification/exam-objectives/#hdpcdjava

DS ML Model https://www.kaggle.com/c/titanic

Spark vs Hadoop https://dzone.com/articles/hadoop-vs-spark-a-head-to-head-comparison

https://www.toptal.com/spark/introduction-to-apache-spark ----------13022018---------- http://www.codecommit.com/blog/scala/scala-for-java-refugees-part-1 ----------04012018---------- https://stackoverflow.com/questions/22933974/how-to-parse-json-from-elasticsearch-into-array-without-the-meta-info Scala--------------------------http://www.dhgarrette.com/nlpclass/scala/basics.html

machine-learning-resources--------------------https://dzone.com/articles/machine-learning-resources spark Definition : Spark is a distributed programming model where the user specifies transformations, which build up a directed-acyclic-graph of instructions, and actions, which begin the process of executing that graph of instructions, as a single job, by breaking it down into stages and tasks to execute across the cluster.

created a data frame and accessed it using Sparksql (Simple one) and scala methods like groupBy().sum().sort() ...

-------For spark version 1.6------- there is a need to create a sqlContext object with Sparksql val sqlContext = new org.apache.spark.sql.SQLContext(sc)

create a dataframe val df = sqlContext.read.json("/apps/hive/warehouse/sample_json/2015-summary.json")

accessing it through SQL

val maxSql = sqlContext.sql("""select DEST_COUNTRY_NAME,sum(count) as dest_total FROM df GROUP BY DEST_COUNTRY_NAME ORDER BY sum(count) DESC LIMIT 5 """)

accessing it through Scala methods

df.groupBy("DEST_COUNTRY_NAME").sum("count").withCoulmnRenamed("sum(count)","desti_tot").sort(desc("desti_tot")).limit(5).collect()

rkwahile commented 6 years ago

---------------------MAPREDUCE---------------------

The key class of a mapper that maps text files is always LongWritable. That is because it contains the byte offset of the current line and this could easily overflow an integer.

https://stackoverflow.com/questions/14922087/hadoop-longwritable-cannot-be-cast-to-org-apache-hadoop-io-intwritable

rkwahile commented 6 years ago

------------PYSPARK------------ https://databricks.com/session/data-wrangling-with-pyspark-for-data-scientists-who-know-pandas

  1. Accessing hive table and finding duplicates. from pyspark.sql import SQLContext,HiveContext from pyspark import SparkContext

    import pandas

    from pyspark.sql.types import* import pyspark.sql.functions as f

sc =SparkContext.getOrCreate() sqlContext =HiveContext(sc)

df_load = sqlContext.sql("select employeeName,department from emp") dfdup = df_load.groupBy(df_load.columns).count().where(f.col('count') > 1).select(f.sum('count')) dfdup.show()

rkwahile commented 6 years ago

----------NLP EXAMPLE---------

import pandas as pd import nltk dfGlobalMaster = pd.read_csv("filename.csv", sep=',') for line in dfGlobalMaster['columnName']:

print (line)

    text_pos = nltk.pos_tag(line.split())
    #print(text_pos)
    grammar = r"""
            NP: {<RB><JJR><IN><CD><NNS|NN>}
            NP1: {<VBG><CD><NN|NNS><CC><JJR>}
            NP2: {<JJR><IN><CD><NNS|NN>}
            NP3: {<VBG><RB><RBR><IN><CD><NNS|NN>}
            NP4: {<RB><VBG><CD><NN|NNS>}
            NP5: {<VBG><CD><NN|NNS>}
            NP6: {<VBG><CD><NNS|NN><CC><RB><VBG><CD><NN|NNS>}
                 }<CC>{
            NP7: {<CD><NNS|NN><CC><JJR><CC><RB><VBG><CD><NN|NNS>}
            """
    parser = nltk.RegexpParser(grammar)
    #print(parser)
    parsenum = parser.parse(text_pos)
    #print(parsenum)
    min_value = ['0']
    max_value = ['NO Limit']
    unit = ''
    text_NP = [tree.leaves() for tree in parsenum.subtrees() if tree.label() == 'NP']
    #print(text)
    if (len(text_NP) > 0):
        for sub in text_NP:
            for (word,tags) in sub:
                if tags == "JJR":
                    if word == "less":
                        min_value = [word for word,tags in sub if (tags == 'CD')]
                        #print(min)
                    elif word == "more":
                        max_value = [word for word,tags in sub if (tags == 'CD')]
                        #print(max)
                unit = [word for word,tags in sub if (tags == 'NNS' or tags == 'NN')]
        list = [line,min_value,max_value,unit]
        print(list)
    text_NP1 = [tree.leaves() for tree in parsenum.subtrees() if tree.label() == 'NP1']
    if (len(text_NP1) > 0):
        for sub in text_NP1:
            for (word,tags) in sub:
                if tags == "JJR":
                    if word == "more":
                        min_value = [word for word,tags in sub if (tags == 'CD')]
                        #print(min)
                    elif word == "less":
                        max_value = [word for word,tags in sub if (tags == 'CD')]
                        #print(max)
                unit = [word for word,tags in sub if (tags == 'NN' or tags == 'NNS')]
        list = [line,min_value,max_value,unit]
        print(list)
    text_NP2 = [tree.leaves() for tree in parsenum.subtrees() if tree.label() == 'NP2']
    if (len(text_NP2) > 0):
        for sub in text_NP2:
            for (word,tags) in sub:
                if tags == "JJR":
                    if word == "less":
                        max_value = [word for word,tags in sub if (tags == 'CD')]
                    #print(min)
                elif word == "more":
                        min_value = [word for word,tags in sub if (tags == 'CD')]
                    #print(max)
                unit = [word for word,tags in sub if (tags == 'NNS' or tags == 'NN')]
        list = [line,min_value,max_value,unit]
        print(list)
    text_NP3 = [tree.leaves() for tree in parsenum.subtrees() if tree.label() == 'NP3']
    if (len(text_NP3) > 0):
        for sub in text_NP3:
            for (word,tags) in sub:
                if tags == "RBR":
                    if word == "more":
                        max_value = [word for word,tags in sub if (tags == 'CD')]
                    #print(min)
                    elif word == "less":
                        min_value = [word for word,tags in sub if (tags == 'CD')]
                    #print(max)
                unit = [word for word,tags in sub if (tags == 'NNS' or tags == 'NN')]
        list = [line,min_value,max_value,unit]
        print(list)
    text_NP7 = [tree.leaves() for tree in parsenum.subtrees() if tree.label() in('NP7')]
    if (len(text_NP7) > 0):
        for sub in text_NP:
            grammar_NP7 = r"""
                        NP7_1: {<CD><NNS|NN><CC><JJR>}
                        NP7_2: {<RB><VBG><CD><NN|NNS>}
                        """
            parser_NP7 = nltk.RegexpParser(grammar_NP7)
            parsenum_NP7 = parser_NP7.parse(sub)
            text_NP7_1 = [tree.leaves() for tree in parsenum_NP7.subtrees() if tree.label() in('NP7_1','NP7_2')]
            for sub in text_NP7_1:
                list1 = []
                list2 = []
                for i in sub:
                    list1.append(i[1])
                    list2.append(i[0])
                    if list1 in (['CD', 'NNS', 'CC', 'JJR'],['CD', 'NN', 'CC', 'JJR']):
                        min_value = [list2[0]]
                    if list1 in (['RB', 'VBG', 'CD', 'NN'],['RB', 'VBG', 'CD', 'NNS']):
                        max_value = [list2[2]]
            unit = [word for word,tags in sub if (tags == 'NNS' or tags == 'NN')]
        list = [line,min_value,max_value,unit]
        print(list)
    text_NP4 = [tree.leaves() for tree in parsenum.subtrees() if tree.label() in ('NP6','NP4','NP5')]
    if (len(text_NP4) > 0):
        for sub in text_NP4:
            list1 = []
            list2 = []
            for i in sub:
                list1.append(i[1])
                list2.append(i[0])
                if list1 in (['RB', 'VBG', 'CD', 'NN'],['RB', 'VBG', 'CD', 'NNS']):
                    max_value = [list2[2]]
                if list1 in (['VBG', 'CD', 'NNS'],['VBG', 'CD', 'NN']):
                    min_value = [list2[1]]
            unit = [word for word,tags in sub if (tags == 'NNS' or tags == 'NN')]
        list = [line,min_value,max_value,unit]
        print(list)
    #text_NP7 = [tree.leaves() for tree in parsenum.subtrees() if tree.label() in('NP7')]
    #if (len(text_NP7) > 0):
    #    for sub in text_NP:
    #        grammar_NP7 = r"""
    #                    NP7_1: {<CD><NNS|NN><CC><JJR>}
    #                    NP7_2: {<RB><VBG><CD><NN|NNS>}
    #                    """
    #        parser_NP7 = nltk.RegexpParser(grammar_NP7)
    #        parsenum_NP7 = parser1.parse(sub)
    #        text_NP7_1 = [tree.leaves() for tree in parsenum1.subtrees() if tree.label() in('NP7_1','NP7_2')]
    #        for sub in text_NP7_1:
    #            list1 = []
    #            list2 = []
    #            for i in sub:
    #                list1.append(i[1])
    #                list2.append(i[0])
    #                if list1 in (['CD', 'NNS', 'CC', 'JJR'],['CD', 'NN', 'CC', 'JJR']):
    #                    min_value = [list2[0]]
    #                if list1 in (['RB', 'VBG', 'CD', 'NN'],['RB', 'VBG', 'CD', 'NNS']):
    #                    max_value = [list2[2]]
    #        unit = [word for word,tags in sub if (tags == 'NNS' or tags == 'NN')]
    #    list = [line,min_value,max_value,unit]
    #    print(list)
    #text_NP4 = [tree.leaves() for tree in parsenum.subtrees() if tree.label() == 'NP4']
    #if (len(text_NP4) > 0):
     #   for sub in text_NP4:
      #      for (word,tags) in sub:
       #         max_value = [word for word,tags in sub if (tags == 'CD')]
        #        unit = [word for word,tags in sub if (tags == 'NNS' or tags == 'NN')]
        #list = [line,min_value,max_value,unit]
        #print(list)
rkwahile commented 6 years ago

-------------Spark Certification-------------

https://www.quora.com/Can-someone-share-the-pattern-of-Hortonworks-HDPCD-spark-certification-sample-questions

rkwahile commented 6 years ago

--------------------------------------Log Parsing Map Reduce Multi File Output--------------------------------------

http://santoshsorab.blogspot.com/2014/12/creating-multiple-output-folder-in.html

package com.mapper;

import java.io.FileNotFoundException; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.LinkedHashMap; import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;

import com.google.gson.Gson;

public class CommodityKeywordLookupLogParser extends Configured implements Tool {

public static class MultipleMapper extends Mapper<LongWritable, Text, Text, LongWritable> { String fileNameStamp = new SimpleDateFormat("yyyy-MM-dd").format(new Date()); Pattern pattern = null; MultipleOutputs multipleOutputs;

@Override() protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { multipleOutputs = new MultipleOutputs(context); }

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, java.lang.InterruptedException {

String inputValues = value.toString();

if (inputValues.contains("request:")) { String requestResult = request(inputValues); // System.out.println(result);

    multipleOutputs.write(NullWritable.get(), new Text(requestResult ), "Request_"+fileNameStamp);

} 
else if (inputValues.contains("reply:")) {
    String resultReply = reply(inputValues);
    multipleOutputs.write(NullWritable.get(), new Text(resultReply ), "Reply_"+fileNameStamp);
} 

else if (inputValues.contains("Invoke")) {
    String resultInvoke = invoke(inputValues);
    multipleOutputs.write(NullWritable.get(), new Text(resultInvoke), "Invoke_"+fileNameStamp);
}

// String fileName = generateFileName(values[0]);

// multipleOutputs.write(NullWritable.get(), value, fileName);

}

@Override() protected void cleanup(Context context) throws java.io.IOException, InterruptedException { multipleOutputs.close(); }

private static String generateFileName(String values) { return values + "/" + values; }

}

@Override public int run(String[] args) throws Exception { Job job = new Job(getConf()); job.setJarByClass(CommodityKeywordLookupLogParser.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(CommodityKeywordLookupLogParser.MultipleMapper.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); job.setNumReduceTasks(0); job.waitForCompletion(true); return 0; }

public static void main(String[] args) throws Exception, ClassNotFoundException, InterruptedException { ToolRunner.run(new CommodityKeywordLookupLogParser(), args); }

static String request(String line) throws FileNotFoundException { String[] elements; String[] firstRequest; elements = line.split(","); firstRequest = elements[0].split(" "); String[] combinedarray = new String[elements.length + (firstRequest.length - 1)]; String[] finalarray = new String[combinedarray.length]; String[] finalseparatedarray = new String[finalarray.length]; String[] withoutspacearray = new String[finalseparatedarray.length]; int count = 0; for (int i = 0; i < firstRequest.length; i++) { combinedarray[i] = firstRequest[i]; count++; } for (int j = 1; j < (elements.length); j++) { combinedarray[count++] = elements[j]; } count = 0; for (int k = 0; k < combinedarray.length; k++) { if (combinedarray[k].contains("}") && combinedarray[k].contains("{")) { String part_string1 = combinedarray[k].substring(0, combinedarray[k].indexOf('}')); String part_string2 = combinedarray[k].substring(combinedarray[k].indexOf('{') + 1, combinedarray[k].length()); finalarray[count] = part_string1; finalarray[count + 1] = part_string2; count++; } else { finalarray[count] = combinedarray[k]; count++; } } for (int l = 0; l < finalarray.length; l++) { if (finalarray[l].contains("{")) { int delimiter_location = finalarray[l].indexOf('{'); String part_string = finalarray[l].substring(delimiter_location + 1, finalarray[l].length()); finalseparatedarray[l] = part_string; } else if (finalarray[l].contains("}")) { int delimiter_location = finalarray[l].indexOf('}'); String part_string = finalarray[l].substring(0, delimiter_location); finalseparatedarray[l] = part_string; } else { finalseparatedarray[l] = finalarray[l]; } } count = 0; for (int l = 0; l < finalseparatedarray.length; l++) { if (!finalseparatedarray[l].isEmpty() && !finalseparatedarray[l].contains("request:")) { withoutspacearray[count] = finalseparatedarray[l].trim(); count++; } } int len_withoutspacearray = count; /for(int i=0;i<len_withoutspacearray;i++) { System.out.println(withoutspacearray[i]); }/ // System.out.println(count); LinkedHashMap<String, String> holdRequestMap = new LinkedHashMap<String, String>(); holdRequestMap.put("Request_Execution_Date", withoutspacearray[0]); holdRequestMap.put("Request_Execution_Time", withoutspacearray[1]); holdRequestMap.put("Request_Running_Thread_Name", withoutspacearray[2]); holdRequestMap.put("Request_Node_Name", withoutspacearray[3]); holdRequestMap.put("Request_Type_Of_Server", withoutspacearray[4]); holdRequestMap.put("Request_Server_SubType", withoutspacearray[5]); holdRequestMap.put("Request_Service_Class_Method", withoutspacearray[6]); holdRequestMap.put("Request_ConversationID", withoutspacearray[7].substring(withoutspacearray[7].indexOf("=") + 1, withoutspacearray[7].length())); holdRequestMap.put("Request_ApplicationID", withoutspacearray[8]); holdRequestMap.put("Request_Unique_UserID", withoutspacearray[9]); holdRequestMap.put("Request_Service_Name", withoutspacearray[10]);

     String Request_ColumnName="accountNumber,groundShipperNumber,meterNumber,masterMeterNumber,meterInstance,companyId,softwareId,softwareRelease,clientProductId,clientProductVersion,middlewareProductId,middlewareProductVersion,integratorId,region,autoConfigurationType,cspCredentialKey,userCredentialKey,content,languageCode,localeCode,userId,password,uniqueUserId,customerTransactionId,languageCode,localeCode,internalTransactionId,tracing,sourceFormat,environment,serviceId,major,intermediate,minor,applicationId,keywordTypes,commodityId,name,numberOfPieces,description,purpose,countryOfManufacture,harmonizedCode,units,value,quantity,quantityUnits,quantity,units,currency,amount,currency,amount,category,value,exportLicenseNumber,exportLicenseExpirationDate,cIMarksAndNumbers,partNumber,preferenceCriterion,producerDetermination,producerId,netCostMethod,begins,ends";

  String[] Request_ColumnNameArray;
  Request_ColumnNameArray=Request_ColumnName.split(",");

  for(int j=0;j<Request_ColumnNameArray.length;j++) {
               int contain = 0;
               out: for (int i = 11; i < len_withoutspacearray; i++) {
                      if (withoutspacearray[i].contains(Request_ColumnNameArray[j])) {

// System.out.println(withoutspacearray[i]+" "+st); holdRequestMap.put(Request_ColumnNameArray[j], withoutspacearray[i].substring(withoutspacearray[i].indexOf(":") + 1, withoutspacearray[i].length())); break out; } else { contain++; } if (contain != 0 && i == len_withoutspacearray-1) { holdRequestMap.put(Request_ColumnNameArray[j], "null"); } } } Gson gson = new Gson(); String json = gson.toJson(holdRequestMap, LinkedHashMap.class); holdRequestMap.clear(); //System.out.println(json); return json; }

//Function to fetch columns for reply static String reply(String line) throws FileNotFoundException { if (line.contains("alternateKeywords()") || line.contains("alternateKeywords(0)") || !line.contains("alternateKeywords")) { String[] elements; String[] firstRequest; elements = line.split(","); firstRequest = elements[0].split(" "); String[] combinedarray = new String[elements.length + (firstRequest.length - 1)]; String[] finalarray = new String[combinedarray.length]; String[] finalseparatedarray = new String[finalarray.length]; String[] withoutspacearray = new String[finalseparatedarray.length]; int count = 0; for (int i = 0; i < firstRequest.length; i++) { combinedarray[i] = firstRequest[i]; count++; } for (int j = 1; j < (elements.length); j++) { combinedarray[count++] = elements[j]; } count = 0; for (int k = 0; k < combinedarray.length; k++) { if (combinedarray[k].contains("}") && combinedarray[k].contains("{")) { String part_string1 = combinedarray[k].substring(0, combinedarray[k].indexOf('}')); String part_string2 = combinedarray[k].substring(combinedarray[k].indexOf('{') + 1, combinedarray[k].length()); finalarray[count] = part_string1; finalarray[count + 1] = part_string2; count++; } else { finalarray[count] = combinedarray[k]; count++; } } for (int l = 0; l < finalarray.length; l++) { if (finalarray[l].contains("{")) { int delimiter_location = finalarray[l].indexOf('{'); String part_string = finalarray[l].substring(delimiter_location + 1, finalarray[l].length()); finalseparatedarray[l] = part_string; } else if (finalarray[l].contains("}")) { int delimiter_location = finalarray[l].indexOf('}'); String part_string = finalarray[l].substring(0, delimiter_location); finalseparatedarray[l] = part_string; } else { finalseparatedarray[l] = finalarray[l]; } } count = 0; for (int l = 0; l < finalseparatedarray.length; l++) { if (!finalseparatedarray[l].isEmpty() && !finalseparatedarray[l].contains("reply")) { withoutspacearray[count] = finalseparatedarray[l].trim(); count++; } } int len_withoutspacearray = count;

         /*for (int i = 0; i < len_withoutspacearray; i++) {
         System.out.println(withoutspacearray[i]); }*/

        LinkedHashMap<String, String> holdReplyMap = new LinkedHashMap<String, String>();
        holdReplyMap.put("Reply_Execution_Date", withoutspacearray[0]);
        holdReplyMap.put("Reply_Execution_Time", withoutspacearray[1]);
        holdReplyMap.put("Reply_Running_Thread_Name", withoutspacearray[2]);
        holdReplyMap.put("Reply_Node_Name", withoutspacearray[3]);
        holdReplyMap.put("Reply_Type_Of_Server", withoutspacearray[4]);
        holdReplyMap.put("Reply_Server_SubType", withoutspacearray[5]);
        holdReplyMap.put("Reply_Service_Class_Method", withoutspacearray[6]);
        holdReplyMap.put("Reply_ConversationID", withoutspacearray[7].substring(withoutspacearray[7].indexOf("=") + 1,
                withoutspacearray[7].length()));
        holdReplyMap.put("Reply_ApplicationID", withoutspacearray[8]);
        holdReplyMap.put("Reply_Unique_UserID", withoutspacearray[9]);
        holdReplyMap.put("Reply_Service_Name", withoutspacearray[10]);

String Request_ColumnName="accountNumber,groundShipperNumber,meterNumber,masterMeterNumber,meterInstance,companyId,softwareId,softwareRelease,clientProductId,clientProductVersion,middlewareProductId,middlewareProductVersion,integratorId,region,autoConfigurationType,cspCredentialKey,userCredentialKey,content,languageCode,localeCode,userId,password,uniqueUserId,customerTransactionId,languageCode,localeCode,internalTransactionId,tracing,sourceFormat,environment,serviceId,major,intermediate,minor,applicationId,keywordTypes,commodityId,name,numberOfPieces,description,purpose,countryOfManufacture,harmonizedCode,units,value,quantity,quantityUnits,quantity,units,currency,amount,currency,amount,category,value,exportLicenseNumber,exportLicenseExpirationDate,cIMarksAndNumbers,partNumber,preferenceCriterion,producerDetermination,producerId,netCostMethod,begins,ends";

        String[] Request_ColumnNameArray;
        Request_ColumnNameArray=Request_ColumnName.split(",");

        for(int j=0;j<Request_ColumnNameArray.length;j++) {
                     int contain = 0;
                     out: for (int i = 11; i < len_withoutspacearray; i++) {
                            if (withoutspacearray[i].contains(Request_ColumnNameArray[j])) {

// System.out.println(withoutspacearray[i]+" "+st); holdReplyMap.put(Request_ColumnNameArray[j], withoutspacearray[i].substring(withoutspacearray[i].indexOf(":") + 1, withoutspacearray[i].length())); break out; } else { contain++; } if (contain != 0 && i == len_withoutspacearray-1) { holdReplyMap.put(Request_ColumnNameArray[j], "null"); } } } Gson gson = new Gson(); String json = gson.toJson(holdReplyMap, LinkedHashMap.class); holdReplyMap.clear(); return json; } else { String[] elements1; String[] values = new String[20];

    int alternate_start=line.indexOf("alternateKeywords");
    //System.out.println(alternate_start);
    String new_string=line.substring(alternate_start+17, line.length());
    //System.out.println(new_string);
    elements1=new_string.split(":");
    String alternateKeyword_String="";
    for(int i=0;i<elements1.length;i++)
    {
        //System.out.println(elements1[i]);
        if(elements1[i].contains("}"))
        {
            alternateKeyword_String=alternateKeyword_String+elements1[i].substring(0, elements1[i].indexOf("}"))+" | ";
        }
    }
    String[] elements;
    String[] firstRequest;
    line=line.substring(0,alternate_start);
    elements = line.split(",");
    firstRequest = elements[0].split(" ");
    String[] combinedarray = new String[elements.length + (firstRequest.length - 1)];
    String[] finalarray = new String[combinedarray.length];
    String[] finalseparatedarray = new String[finalarray.length];
    String[] withoutspacearray = new String[finalseparatedarray.length];
    int count = 0;
    for (int i = 0; i < firstRequest.length; i++) {
        combinedarray[i] = firstRequest[i];
        count++;
    }
    for (int j = 1; j < (elements.length); j++) {
        combinedarray[count++] = elements[j];
    }
    count = 0;
    for (int k = 0; k < combinedarray.length; k++) {
        //System.out.println(combinedarray[k]);
        if (combinedarray[k].contains("}") && combinedarray[k].contains("{")) {
            String part_string1 = combinedarray[k].substring(0, combinedarray[k].indexOf('}'));
            String part_string2 = combinedarray[k].substring(combinedarray[k].indexOf('{') + 1,
                    combinedarray[k].length());
            finalarray[count] = part_string1;
            finalarray[count + 1] = part_string2;
            count++;
        } else {
            finalarray[count] = combinedarray[k];
            count++;
        }
    }
    for (int l = 0; l < finalarray.length; l++) {
        if (finalarray[l].contains("{")) {
            int delimiter_location = finalarray[l].indexOf('{');
            String part_string = finalarray[l].substring(delimiter_location + 1, finalarray[l].length());
            finalseparatedarray[l] = part_string;
        } else if (finalarray[l].contains("}"))
        {
            int delimiter_location = finalarray[l].indexOf('}');
            String part_string = finalarray[l].substring(0,delimiter_location);
            finalseparatedarray[l] = part_string;
        }
        else
        {
            finalseparatedarray[l] = finalarray[l];
        }
    }
    count = 0;
    for (int l = 0; l < finalseparatedarray.length; l++) {
        if (!finalseparatedarray[l].isEmpty() && !finalseparatedarray[l].contains("request:")
                && !finalseparatedarray[l].contains("reply")) {
            withoutspacearray[count] = finalseparatedarray[l].trim();
            count++;
        }
    }
    int len_withoutspacearray = count;

    /*for (int i = 0; i < len_withoutspacearray; i++) {
    System.out.println(withoutspacearray[i]); }*/

    LinkedHashMap<String, String> holdReplySubMap = new LinkedHashMap<String, String>();
    holdReplySubMap.put("Reply_Execution_Date", withoutspacearray[0]);
    holdReplySubMap.put("Reply_Execution_Time", withoutspacearray[1]);
    holdReplySubMap.put("Reply_Running_Thread_Name", withoutspacearray[2]);
    holdReplySubMap.put("Reply_Node_Name", withoutspacearray[3]);
    holdReplySubMap.put("Reply_Type_Of_Server", withoutspacearray[4]);
    holdReplySubMap.put("Reply_Server_SubType", withoutspacearray[5]);
    holdReplySubMap.put("Reply_Service_Class_Method", withoutspacearray[6]);
    holdReplySubMap.put("Reply_ConversationID",
            withoutspacearray[7].substring(withoutspacearray[7].indexOf("=") + 1, withoutspacearray[7].length()));
    holdReplySubMap.put("Reply_ApplicationID", withoutspacearray[8]);
    holdReplySubMap.put("Reply_Unique_UserID", withoutspacearray[9]);
    holdReplySubMap.put("Reply_Service_Name", withoutspacearray[10]);

    String Reply_ColumnName="highestSeverity,severity,source,code,message,localizedMessage,id,value,customerTransactionId,languageCode,localeCode,internalTransactionId,tracing,sourceFormat,environment,serviceId,major,intermediate,minor,commodityId,name,description,harmonizedCode,types,value";

    String[] Reply_ColumnNameArray;

    Reply_ColumnNameArray=Reply_ColumnName.split(",");
    for (int j=0;j<Reply_ColumnNameArray.length;j++) {
                        int contain = 0;
                        out: for (int i = 11; i < len_withoutspacearray; i++) {
                               //System.out.println(withoutspacearray[i]+" "+st);
                               if (withoutspacearray[i].contains(Reply_ColumnNameArray[j])) {
                                      if (Reply_ColumnNameArray[j].equals("value") || Reply_ColumnNameArray[j].equals("types")) {
                                             holdReplySubMap.put("primaryKeyword:" + Reply_ColumnNameArray[j], withoutspacearray[i]
                                                           .substring(withoutspacearray[i].indexOf(":") + 1, withoutspacearray[i].length()));
                                      }
                                      else
                                      {
                                             holdReplySubMap.put(Reply_ColumnNameArray[j], withoutspacearray[i].substring(withoutspacearray[i].indexOf(":") + 1,
                                                           withoutspacearray[i].length()));
                                      }
                                      break out;
                               } else {
                                      contain++;
                               }
                               if (contain != 0 && i == len_withoutspacearray-1) {
                                      holdReplySubMap.put(Reply_ColumnNameArray[j], "null");
                               }
                        }
                 }
           holdReplySubMap.put("alternatekeywords_types", "TRANSIT");
           holdReplySubMap.put("alternatekeyords_value",alternateKeyword_String);

    Gson gson = new Gson();
    String json = gson.toJson(holdReplySubMap, LinkedHashMap.class);
    holdReplySubMap.clear();
    return json;

    }

}

// Fetch the columns for invoke Time
static String invoke(String line) throws IOException {
    String[] elements;
    elements = line.split(" ");
    LinkedHashMap<String, String> holderInvokeMap = new LinkedHashMap<String, String>();
    holderInvokeMap.put("Invoke_Execution_Date", elements[0]);
    holderInvokeMap.put("Invoke_Execution_Time", elements[2]);
    holderInvokeMap.put("Invoke_Running_Thread_Name", elements[3]);
    holderInvokeMap.put("Invoke_Node_Name", elements[4]);
    holderInvokeMap.put("Invoke_Type_Of_Server", elements[5]);
    holderInvokeMap.put("Invoke_Server_SubType", elements[6]);
    holderInvokeMap.put("Invoke_Service_Class_Method", elements[7]);
    holderInvokeMap.put("Invoke_ConversationID", elements[9].substring(elements[9].indexOf("=") + 1, elements[9].length()));
    holderInvokeMap.put("Invoke_ApplicationID", elements[10]);
    holderInvokeMap.put("Invoke_Unique_UserID", elements[11]);
    holderInvokeMap.put("Invoke_Service_Name", elements[12]);
    holderInvokeMap.put("Heap_Memory_Start", elements[18]);
    holderInvokeMap.put("Percent_Change_In_Memory_Heap", elements[25]);
    holderInvokeMap.put("Invoke_Time", elements[28]);

    Gson gson = new Gson();
    String json = gson.toJson(holderInvokeMap, LinkedHashMap.class);
    holderInvokeMap.clear();
    return json;
}

}