Open shaeqahmed opened 1 year ago
You're welcome to have my work on this once i've ironed it out a bit. Let me know how best to contribute.
Just for context, I created this ingestion against logs that are directed to a kinesis firehose by our OSquery management system (in this case, FleetDM) which then dumps it in to the ingestion bucket.
Parts of the OSquery result logs depend on configuration. e.g. the hostIdentifier
field can contain a UUID or hostname, numerics
can indicate whether numbers were logged as JSON strings or integer
Looking at the Elastic implementation for osquery result logs, it does a lot. And the schema assumes a lot about the structure of the query data and decorators. This is because OSquery integrates in to their wider solution, and they have control over the configuration and queries issued to agents - something that Matano cannot control or make assumptions about.
So the approach I've taken is to leave the column
, snapshot
and decorator
fields as JSON strings which can be parsed in a detection or as part of a query. Panther takes a similar approach, but they have a generic map[string]string type available to them which Matano doesn't have.
Here's what that looks like:
name: "osquery_result"
schema:
ecs_field_names:
- ecs.version
- event.action
- event.created
- event.kind
- event.original
- host.id
- rule.name
fields:
- name: osquery
type:
type: struct
fields:
- name: result
type:
type: struct
fields:
- name: action
type: string
- name: calendar_time
type: string
- name: columns
type: string
- name: counter
type: int
- name: decorations
type: string
- name: epoch
type: int
- name: host_identifier
type: string
- name: name
type: string
- name: snapshot
type: string
- name: unix_time
type: int
transform: |
# transform the log to adhere to the basic ECS schema
.event.original = encode_json(.json)
.event.created = .ts
if .json.unixTime != null {
.ts = to_timestamp!(.json.unixTime, "seconds")
}
.osquery.result = del(.json)
.osquery.result.host_identifier = del(.osquery.result.hostIdentifier)
.osquery.result.unix_time = del(.osquery.result.unixTime)
.osquery.result.calendar_time = del(.osquery.result.calendarTime)
del(.osquery.result.numerics)
.rule.name = .osquery.result.name
.event.kind = "event"
.event.type = ["info"]
.event.action = .osquery.result.action
.host.id = .osquery.result.host_identifier
# store the semi-structured string:string data as json
.osquery.result.columns = encode_json(del(.osquery.result.columns))
.osquery.result.snapshot = encode_json(del(.osquery.result.snapshot))
.osquery.result.decorations = encode_json(del(.osquery.result.decorations))
That covers the basics enough for normalising the results in to ECS for storage.
My specific ingestion does a lot more, because I configure a number of decorator
queries, and have control over the various query results that come through, I am comfortable reading data out of the columns
and decorator
field and adding to a bunch of other ECS fields. However, I think that needs to be left up to the user to implement for their specific use cases.
Similar approach to the result logs, but there's more info in here we can rely on to normalise out to ECS.
name: "osquery_status"
schema:
ecs_field_names:
- ecs.version
- event.created
- event.kind
- event.original
- event.severity
- host.id
- log.origin.file.line
- log.origin.file.name
- message
- service.name
- service.version
fields:
- name: osquery
type:
type: struct
fields:
- name: status
type:
type: struct
fields:
- name: calendar_time
type: string
- name: decorations
type: string
- name: host_identifier
type: string
- name: unix_time
type: int
transform: |
# transform the log to adhere to the basic ECS schema
.event.original = encode_json(.json)
.event.created = .ts
if .json.unixTime != null {
.ts = to_timestamp!(.json.unixTime, "seconds")
}
if is_string(.json.unixTime) {
.json.unixTime = parse_int!(.json.unixTime)
}
.osquery.status = del(.json)
.osquery.status.host_identifier = del(.osquery.status.hostIdentifier)
.osquery.status.unix_time = del(.osquery.status.unixTime)
.osquery.status.calendar_time = del(.osquery.status.calendarTime)
.event.kind = "event"
.event.type = ["info"]
.event.severity = parse_int!(del(.osquery.status.severity))
.host.id = .osquery.status.host_identifier
.log.origin.file.name = del(.osquery.status.filename)
.log.origin.file.line = parse_int!(del(.osquery.status.line))
.message = del(.osquery.status.message)
.service.name = "osquery"
.service.version = del(.osquery.status.version)
# store the remaining semi-structured string:string data as json
.osquery.status.decorations = encode_json(del(.osquery.status.decorations))
A matano managed log source for osquery has been requested by a few community users, would be great to support pulling logs from osquery (e.g. query results, diffs) and storing them in a Matano data lake for endpoint context.