phutchins / logstash-input-mongodb

MongoDB input plugin for Logstash
Other
187 stars 104 forks source link

Aggregate filter not working in Logstash 5.4.2. #76

Open arunvijayam opened 7 years ago

arunvijayam commented 7 years ago

`input { jdbc { jdbc_connection_string => "jdbc:oracle:thin:@drssqlentrac_sc.aaa-acg.net:1521/orasliud" jdbc_user => "admin" jdbc_password => "admin" jdbc_driver_library => "C:\Official\ojdbc6-11.2.0.3.jar" jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" statement => "SELECT CI.FIRSTNAME||''||CI.LASTNAME||''||CI.GENDER||'_'||TO_CHAR(CI.DATE_OF_BIRTH,'YYYYMMDD') AS message, CI.TITLE AS prefix, CI.FIRST_NAME AS firstname, CI.MIDDLE_NAME AS middlename, CI.LAST_NAME AS lastname, CI.TITLE_SUFFIX AS suffix, CI.GENDER AS gender, CI.DATE_OF_BIRTH AS dob, CI.MARITAL_STATUS AS maritalstatus, CI.SOCIAL_SECURITY_NUM AS ssn, CI.REF_NUMBER AS clientid, NULL AS customersegmentdescription, NULL AS householdid, CASE WHEN CIP.PRIMARY_FLAG = 'Y' THEN PHONE_NUM END AS primaryphone, CASE WHEN CIP.NUM_TYPE = 'Home' THEN PHONE_NUM END AS homephone, CASE WHEN CIP.NUM_TYPE = 'Office' THEN PHONE_NUM END AS workphone, CIP.EXTENSION AS extension, CASE WHEN CIP.NUM_TYPE = 'Cell' THEN PHONE_NUM END AS mobilephone, CASE WHEN CIP.NUM_TYPE = 'Fax' THEN PHONE_NUM END AS faxphone, CI.EMAIL_ADDRESS AS primaryemail, NULL AS secondaryemail, regexp_replace(CO.ADDRESS_LINE1,'[^a-zA-Z0-9 ]+','') AS address1, regexp_replace(CO.ADDRESS_LINE2,'[^a-zA-Z0-9 ]+','') AS address2, regexp_replace(CO.ADDRESS_LINE3,'[^a-zA-Z0-9 ]+','') AS address3, CO.COUNTY_ID ||' - ' ||CO.COUNTY AS county, CO.CITY AS city, CO.STATE_ID ||' - ' ||CO.STATE AS state, CO.ZIPCODE AS zipcode, CO.COUNTRY AS country, CPA.ADDRESS_TYPE AS addresstype, CI.DL_NUMBER AS licensenumber, CI.DL_STATE_ID AS licensestate, CPS.MEMBERSHIP_NUMBER AS membershipid, NULL AS membershipflag, CPT.POLICY_NUM_PREFIX ||CPT.POLICY_NUM AS policynumber, CPT.STATUS AS policystatus, NULL AS associatedrole, CPY.REF_NUMBER AS agentid, CPY.FULL_NAME AS agentname, NULL AS agentype, NULL AS agentcontactnumber, CPY.FULL_NAME AS agencyname, NULL AS agentnumber FROM CO_INSURED CI LEFT OUTER JOIN CO_INSURED_PHONE_NUM CIP ON CI.CO_ID = CIP.CO_ID LEFT OUTER JOIN CO_ADDRESS CO ON CI.CO_ID = CO.CO_ID LEFT OUTER JOIN CO_PARTY_ADDRESS CPA ON CI.CO_ID = CPA.CO_ID LEFT OUTER JOIN CO_POLICY_SNAPSHOT CPS ON CI.CO_ID = CPS.CO_ID LEFT OUTER JOIN CO_POLICY_TERM CPT ON CI.CO_ID = CPT.CO_ID LEFT OUTER JOIN CO_PARTY CPY ON CI.CO_ID = CPY.CO_ID" add_field => { "batch_id" => "SPL%{+YYYYMMdd}" } } } filter { ruby { code => "fieldArray = event.get('firstname').to_s.split(' ') a = Array.new fieldArray.map do |word| a.push(word.capitalize) end event.set('firstname',a.join(' '))" }

ruby { code => "fieldArray2 = event.get('lastname').to_s.split(' ') b = Array.new fieldArray2.map do |word2| b.push(word2.capitalize) end event.set('lastname',b.join(' '))" } aggregate { task_id => "%{message}" code => " map['message'] = event.get('message') map['prefix'] = event.get('prefix') map['firstname'] = event.get('firstname') map['middlename'] = event.get('middlename') map['prefix'] = event.get('prefix') map['lastname'] = event.get('lastname') map['suffix'] = event.get('suffix') map['gender'] = event.get('gender') map['dob'] = event.get('dob') map['maritalstatus'] = event.get('maritalstatus') map['ssn'] = event.get('ssn') map['clientid'] = event.get('clientid') map['customersegmentdescription'] = event.get('customersegmentdescription') map['householdid'] = event.get('householdid') map['primaryphone'] = event.get('primaryphone') map['homephone'] = event.get('homephone') map['workphone'] = event.get('workphone') map['extension'] = event.get('extension') map['mobilephone'] = event.get('mobilephone') map['faxphone'] = event.get('faxphone') map['primaryemail'] = event.get('primaryemail') map['secondaryemail'] = event.get('secondaryemail') map['licensenumber'] = event.get('licensenumber') map['licensestate'] = event.get('licensestate') map['membershipid'] = event.get('membershipid') map['membershipflag'] = event.get('membershipflag') map['hash_key'] = event.get('hash_key') map['address'] ||= [] map['address'] << {'address1' => event.get('address1'), 'address2' => event.get('address2'), 'address3' => event.get('address3'), 'county' => event.get('county'), 'city' => event.get('city'), 'state' => event.get('state'), 'zipcode' => event.get('zipcode'), 'country' => event.get('country') , 'addresstype' => event.get('addresstype') } map['policy'] ||= [] map['policy'] << {'policynumber' => event.get('policynumber'), 'policystatus' => event.get('policystatus'), 'associatedrole' => event.get('associatedrole') } map['agent'] ||= [] map['agent'] << {'agentid' => event.get('agentid'), 'agentname' => event.get('agentname'), 'agenttype' => event.get('agenttype'), 'agentcontactnumber' => event.get('agentcontactnumber'), 'agencyname' => event.get('agencyname'), 'agentnumber' => event.get('agentnumber') } event.cancel() " push_previous_map_as_event => true timeout => 3 aggregate_maps_path => "C:/Official/.aggregate_maps" inactivity_timeout => 3 map_action => "create_or_update" push_map_as_event_on_timeout => true timeout_code => "event.set('state', 'timeout')" timeout_tags => ["aggregate_timeout"] timeout_task_id_field => "message_failed" } } output { mongodb { uri => "mongodb://localhost:27017/" database => "STAGING2" collection => "SPL_STAGING2" isodate => true } } `