matanolabs / matano

Open source security data lake for threat hunting, detection & response, and cybersecurity analytics at petabyte scale on AWS
https://matano.dev
Apache License 2.0
1.42k stars 98 forks source link

teleport - Managed Log Source #150

Closed chrismsnz closed 11 months ago

chrismsnz commented 1 year ago

Mentioned in #17

Teleport is an access management system which emits a lot of different audit logs

https://goteleport.com/docs/reference/audit/

There doesn't appear to be One True Way to get these logs in to AWS. For my purposes, I'm using vector to ship a local log file to firehose.

The log format is quite loosely defined and I've seen it emit events that are not documented, so it will be important to be strict as to what we keep to make sure it fits the schema, and to keep a copy of the original event in event.original in case they make changes and users need to parse out any changes.

Panther has teleport support https://docs.panther.com/data-onboarding/supported-logs/teleport

chrismsnz commented 1 year ago

First attempt at log ingestion for teleport. It's a bit of a mess, but this has been running for a couple weeks on busy teleport servers with no schema or VRL errors.

I break out as much as I can to ECS, but there's still a lot of teleport-specific content which I write to its own top level. Event categorisation is best-effort, I'm sure there's stuff I missed.

Some gotchas:

name: "teleport_audit_logs"
schema:
  ecs_field_names:
  - destination.user.name
  - destination.address
  - destination.ip
  - destination.port
  - destination.bytes
  - ecs.version
  - event.action
  - event.category
  - event.created
  - event.code
  - event.end
  - event.id
  - event.original
  - event.outcome
  - event.reason
  - event.start
  - event.type
  - file.directory
  - host.hostname
  - host.id
  - process.args
  - process.command_line
  - process.executable
  - process.exit_code
  - process.name
  - process.pid
  - process.parent.pid
  - network.direction
  - network.type
  - related.ip
  - related.user
  - related.hosts
  - source.user.name
  - source.address
  - source.ip
  - source.port
  - source.bytes
  - user.name
  fields:
  - name: teleport
    type:
      type: struct
      fields:
      - name: action
        type: string
      - name: attributes
        type: string
      - name: cgroup_id
        type: int
      - name: cluster_name
        type: string
      - name: ei
        type: int
      - name: enhanced_recording
        type: boolean
      - name: interactive
        type: boolean
      - name: method
        type: string
      - name: namespace
        type: string
      - name: participants
        type:
          type: list
          element: string
      - name: proto
        type: string
      - name: operation
        type: string
      - name: server_addr
        type: string
      - name: server_labels
        type: string
      - name: session_recording
        type: string
      - name: sid
        type: string
      - name: size
        type: string
transform: |
  .related.hosts = []
  .event.original = encode_json(.json)
  .event.created = .ts 
  .ts = to_timestamp!(del(.json.time)) 

  .event.action = to_string!(del(.json.event))

  .event.category = []
  .event.type = []
  if includes(["auth", "user.login"], .event.action) {
    .event.category = push(.event.category, "authentication")
  }
  if contains(.event.action, "db.") {
    .event.category = push(.event.category, "database")
  }
  if includes(["session.disk", "scp"], .event.action) {
    .event.category = push(.event.category, "file")
  }
  if .event.action == "session.network" {
    .event.category = push(.event.category, "network")
    .event.type = push(.event.type, "connection")
  }
  if includes(["session.start", "session.end", "session.join", "session.leave", "app.session.start"], .event.action) {
    .event.category = push(.event.category, "session")
  }
  if includes(["session.start", "session.join", "app.session.start"], .event.action) {
    .event.type = push(.event.type, "start")
  }
  if includes(["session.end", "session.leave"], .event.action) {
    .event.type = push(.event.type, "end")
  }

  .event.id = del(.json.uid)
  .event.code = del(.json.code)
  if is_boolean(.json.success) {
    if bool!(.json.success) {
      .event.outcome = "success"
    } else {
      .event.outcome = "failure"
    }
    del(.json.success)
  }
  .event.reason = del(.json.error)
  if .json.session_start != null {
    .event.start = to_timestamp!(del(.json.session_start))
  }
  if .json.session_end != null {
    .event.end = to_timestamp!(del(.json.session_stop))
  }

  .teleport.cgroup_id = del(.json.cgroup_id)
  .teleport.cluster_name = del(.json.cluster_name)
  .teleport.ei = del(.json.ei)
  .teleport.enhanced_recording = del(.json.enhanced_recording)
  .teleport.interactive = del(.json.interactive)
  .teleport.method = del(.json.method)
  .teleport.namespace = del(.json.namespace)
  .teleport.participants = del(.json.participants)
  .teleport.proto = del(.json.proto)
  .teleport.sid = del(.json.sid)
  .teleport.size = del(.json.size)
  .teleport.server_addr = del(.json.server_addr)
  .teleport.session_recording = del(.json.session_recording)
  if .json.action != null {
    .teleport.action = to_string!(del(.json.action))
  }
  if .json.operation != null {
    .teleport.operation = encode_json(del(.json.operation))
  } 
  if .json.attributes != null {
    .teleport.attributes = encode_json(del(.json.attributes))
  }
  if .json.server_labels != null {
    .teleport.server_labels = encode_json(del(.json.server_labels))
  }

  .source.user.name = del(.json.user)
  .destination.user.name = del(.json.login)

  .host.hostname = del(.json.server_hostname)
  .host.id = del(.json.server_id)

  if .event.action == "session.command" {
    .process.name = del(.json.program)
    .process.executable = del(.json.path)
    .process.args = del(.json.argv)
    .process.exit_code = del(.json.return_code)
  }
  .process.pid = del(.json.pid)
  .process.parent.pid = del(.json.ppid)
  .process.command_line = del(.json.initial_command)
  .process.command_line = del(.json.command)
  .process.exit_code = to_int!(del(.json.exitCode))

  .source.address = del(.json."addr.remote")
  if .source.address != null {
    src_tuple = split!(.source.address, ":", limit:2)
    .source.ip = src_tuple[0]
    .source.port = to_int!(src_tuple[1])
  }

  .destination.address = del(.json."addr.local")
  if .destination.address != null {
    dst_tuple = split!(.destination.address, ":", limit:2)
    .destination.ip = dst_tuple[0]
    .destination.port = to_int!(dst_tuple[1])
  }

  if .event.action == "session.network" {
    .destination.address = del(.json.dst_addr)
    .destination.ip = .destination.address
    .destination.port = del(.json.dst_port)
    .source.address = del(.json.src_addr)
    .source.ip = .source.address

    .network.direction = "egress"
    if .json.version == 4 {
      .network.type = "ipv4"
    } else if .json.version == 6 {
      .network.type = "ipv6"
    }
    del(.json.version)    
  }

  if .event.action == "scp" {
    .file.directory = del(.json.path)
  }

  if .json.tx != null {
    .source.bytes = to_int!(.json.tx)
  }
  if .json.rx != null {
    .destination.bytes = to_int!(.json.rx)
  }

  # tidy up ecs
  .user = .source.user
  .related.ip = push(.related.ip, .source.ip)
  .related.ip = push(.related.ip, .destination.ip)
  .related.user = push(.related.user, .source.user.name)
  .related.user = push(.related.user, .destination.user.name)
  if is_array(.teleport.participants) {
    .related.user = append!(.related.user, .teleport.participants)
  }
  .related.user = unique(.related.user)
  .related.hosts = push(.related.hosts, .host.hostname)
timoguin commented 11 months ago

Closed via #153