OzWookiee / avaya-smdr-elasticstack

Includes a logstash pipeline, elasticsearch mappings and several kibana visualisations and dashboards
GNU General Public License v3.0
5 stars 3 forks source link

using docker - cannot find index after setting up logstash pipeline #3

Closed loekey87 closed 8 months ago

loekey87 commented 4 years ago

I have and ELK docker set up and I've imported everything into kibana, set up the pipline and double checked my settings below is my pipeline and screenshot of SMDR setup, i've tried several different options for host for elasticsearch as well but it's not picking up the index (ip address of phone system has been ommited as well as ip of host running ELK stack:

input {

connects directly to our phone server

tcp { mode => "client" host => "######" port => 3000 type => "smdr" } #Live SMDR

}#input

filter { if "smdr" not in [tags] { if [type] == "smdr" {

read in new data and process

        csv {
            columns => [ "CallStart", "ConnectedTime", "RingTime", "Caller", "Direction", "CalledNumber", "DialledNumber", "Account", "IsInternal", "CallID", "Continuation", "Part1Device", "Party1Name", "Party2Device", "Party2Name", "HoldTime", "ParkTime", "AuthValid", "AuthCode", "UserCharged", "CallCharge", "Currency", "AmountAtLastUserCharge", "CallUnits", "UnitsAtLastUserCharge", "CostPerUnit", "Markup", "ExternalTargetingCause", "ExternalTargeterID", "ExternalTargetedNumber",
            "server_ip_caller_extention","CallID_caller_extention","server_ip_called_extention","CallID_called_extention","UTC_time"]
            separator => ","
        }
        mutate {
            add_field => [ "lstimestamp", "%{@timestamp}"]
            add_field => [ "TotalCallTimeSecs", "-1.0" ]
            add_field => [ "TotalCallTimeMins", "-1.0" ]
            add_field => [ "DayNumLocal", "-1"]
            add_field => [ "DayNameLocal", "NoDay"]
            add_field => [ "CallHourLocal", "-1"]
            add_field => [ "username", ""]
            add_tag => [ "smdr" ]
            update => [ "host", "smdr"]
        }

        #Add tags for the different departments
        if "E53" in [Part1Device]  {
            mutate {
                add_tag => [ "CS"]
            }
        } else if "E529" in [Part1Device] {
            mutate {
                add_tag => [ "Data"]
            }
        }else if "E523" in [Part1Device] {
            mutate {
                add_tag => [ "Dev"]
            }
        }else if "E522" in [Part1Device] {
            mutate {
                add_tag => [ "Accounts"]
            }
        } else if "E521" in [Part1Device] {
            mutate {
                add_tag => [ "Sales"]
            }
        } else if "E520" in [Part1Device] {
            mutate {
                add_tag => [ "Admin"]
            }
        }

    }#if type smdr
}#if smdr not tags

if "smdr" in [tags] {
    #Set the Event Timestamp from the log
    date {
         match => ["CallStart", "YYYY/MM/dd HH:mm:ss"]
         timezone => "US/East"
    }#date

    #Convert ConnectedTime from Time to Secs
    grok {
        match => [ "ConnectedTime" , "%{HOUR:ConnTimeHr:int}:%{MINUTE:ConnTimeMin:int}:%{SECOND:ConnTimeSec:int}"]
        tag_on_failure => [ "_failed-smdr-ConnTime" ]
    }

    #CASTING
    mutate {
        convert => [ "CallHourLocal", "integer" ]
        convert => [ "TotalCallTimeSecs", "integer" ]
        convert => [ "DayNumLocal", "integer" ]
        convert => [ "ConnTimeMin", "integer" ]
        convert => [ "RingTime", "integer" ]
        convert => [ "IsInternal", "integer" ]
        convert => [ "CallID", "integer" ]
        convert => [ "Continuation", "integer" ]
        convert => [ "HoldTime", "integer" ]
        convert => [ "ParkTime", "integer" ]
        convert => [ "AuthValid", "integer" ]
        convert => [ "CostPerUnit", "integer" ]
        convert => [ "CallUnits", "integer" ]
        convert => [ "Markup", "float" ]
        convert => [ "UnitsAtLastUserCharge", "integer" ]
        convert => [ "AmountAtLastUserCharge", "float" ]
        convert => [ "CallCharge", "float" ]
        remove_field => ["ruby_exception"]

        remove_field => ["column31"]
        remove_field => ["column32"]
        remove_field => ["column33"]
        remove_field => ["column34"]
        remove_field => ["column35"]
    }

    if [TotalCallTimeSecs] {
        ruby{ code => "
            begin
                # your great code goes here
                event.set('TotalCallTimeSecs', ( event.get('ConnTimeHr') * 60 * 60 ) + ( event.get('ConnTimeMin') * 60 ) + event.get('ConnTimeSec') + event.get('RingTime') + event.get('HoldTime') + event.get('ParkTime'))

                t = Time.at( event.get('@timestamp').to_i )   # Convert number of seconds into Time object.
                event.set('DayNumLocal', t.wday.to_i )
                event.set('DayNameLocal', t.strftime('%A') )
                event.set('CallHourLocal', t.strftime('%H').to_i )

                rescue Exception => e
                    event.set('ruby_exception', 'smdr ' + e.message )
            end
        " }
    }#if TotalConnectedTimeSecs

    #Remove unneeded fields
    mutate {
        #CallStart is redundant and not needed.
        remove_field => [ "ConnTimeHr" ]
        remove_field => [ "ConnTimeMin" ]
        remove_field => [ "ConnTimeSec" ]
        remove_field => [ "CallStart" ]
        remove_tag => [ "smdr" ]
        convert => [ "TotalCallTimeSecs", "integer" ]
    }

    if [Party1Name] {
        #Username lookup
        #in this I have a simple two column CSV file that contains the Avaya Party1Name (Name field in IP Office Manager) and the windows domain username
        #"Bradley B","bradley.bristowstagg"
        #The idea here is to create a unique key for the user so that we can link data from this and other index
        translate {
            field => "Party1Name"
            destination => "username"
            override => "true"
            #add_tag => [ "translate-severity" ]
            fallback => "no match"
            #dictionary_path => "/home/smdr/docker-elk/logstash/users.csv"
            dictionary_path => "/home/smdr/docker-elk/logstash/users.csv"
        }#translate
    }#if Party1Name

    #Call Type
    if [Direction] == "I" {
        if [is_internal] == 0 {
            mutate {
                add_field => [ "call_type", "Incoming External Call" ]
            }
        }
    } 
    else {
        if [is_internal] == 1 {
            mutate {
                add_field => [ "call_type", "Internal Call" ]
            }
        }
        else {
            mutate {
                add_field => [ "call_type", "Outgoing External Call" ]
            }
        }
    }

    #U Xfd
    #ExternalTargetingCause TargetedBy & ReasonCode
    if [ExternalTargetingCause] {
        grok {
            match => { "ExternalTargetingCause" => [ 
                "%{NOTSPACE:TargetedBy:string}", 
                "%{NOTSPACE:TargetedBy:string} %{NOTSPACE:ReasonCode:string}" ] } 
            tag_on_failure => [ "_grokparsefailure_ExternalTargetingCause" ] 
        }

        #As long as the grok works, do the rest
        if "_grokparsefailure_ExternalTargetingCause" not in [tags] {
            translate {
                field => "TargetedBy"
                destination => "TargetedBy_label"
                override => "true"
                #add_tag => [ "TargetedBy" ]
                fallback => "no match"
                dictionary => [ 
                    "HG","HuntGroup.",
                    "U","User.",
                    "AA","AutoAttendant.",
                    "ICR","Incoming CallRoute.",
                    "RAS","Remote AccessService.",
                    "?","Other."
                    ]
            }#translate
            translate {
                field => "ReasonCode"
                destination => "ReasonCode_label"
                override => "true"
                #add_tag => [ "ReasonCode" ]
                fallback => "no match"
                dictionary => [ 
                    "FB","Forward onBusy. ",
                    "FU","Forwardunconditional.",
                    "FNR","Forward on NoResponse.",
                    "FDND","Forward onDND.",
                    "CFP","Conference proposal (consultation)call.",
                    "Cfd","Conferenced.",
                    "MT","MobileTwinning.",
                    "TW","Teleworker.",
                    "XFP","Transfer proposal (consultation) call.",
                    "XFD","Transferredcall."
                    ]
            }#translate
        }#if not in tags  
    }#if ExternalTargetingCause

}#if smdr in tags

} output {

The following should go to a single index per year and have the entries output to a text file as backup

elasticsearch {
    hosts => ["#######"]
    #sniffing => true
    index => "smdr-%{+YYYY}"
    #idle_flush_time => 30
    #manage_template => false
    }

file {
        path => "/home/smdr/docker-elk/logstash/output-smdr-%{+YYYY-MM}.log"
    }
#of course you should back these files up seperately

}

image

OzWookiee commented 4 years ago

What do the logstash logs show? is it showing positive connection to your SMDR service? Is it successfully pushing to your elastic index? Use Kibana to check the index, if there are no entries then the issue should be in Logstash somewhere