Configuring Logstash to Send Log Data to Zebrium
In Zebrium
Retrieve your Zebrium URL and Auth Token for Configuring the Logstash HTTP Output Plugin
- Login to your Zebrium portal user account.
- If you have not yet ingested log event data into Zebrium, go to Step 5. Otherwise continue with Step 3.
- From the User menu area, select the Account Settings gear icon.
- Click the Log/Metrics Collector tab.
- Click the ZELK Stack Log Collector button.
- Note the ZE_LOG_COLLECTOR_URL and ZE_LOG_COLLECTOR_TOKEN values and store for later use when configuring Logstash below.
In Logstash
Zebrium Required Fields (in Logstash)
Zebrium requires certain fields (keys) be defined for each log event. These definitions are part of the "filter" section in the logstash configuration.
There are 5 types of Zebrium fields that require definition in the Logstash filter configuration for proper Incident detection in Zebrium. (Example Logstash configuration is shown below the table):
Type | Description | Key Name | Key Definition | Requirement |
---|---|---|---|---|
Time | Timestamp/time zone of each log event. | @timestamp | Timestamp of each log event (rather than the time the event was processed by Logstash if possible). | Required |
@ze_timezone | Time zone of each log event. E.g. "America/Los_Angeles" | Optional Note:UTC is the default | ||
Log Generator | Indicates the source of the log event. | @ze_deployment_name | Identifies the environment or deployment. E.g. "production", "qa", "dev" | Recommended |
@ze_host | Host name identifier | Required | ||
@ze_logtype | The basename of the log source. E.g. "access.log", "syslog". In the Zebrium UI, it will be the logtype. In the container world, this would probably be the app name. | Required | ||
@ze_gid | If there are multiple processes within the same host, you can specify a distinct Generator ID for those different processes. In the container world, this would probably be a container name. | Optional | ||
Log Events Wrapped in JSON | If the application or host log events are simply wrapped in a JSON and contain a field like "message" : "2020-10-23 04:17:37 mars INFO systemd[1]: Stopped PostgreSQL RDBMS.", then these keys need to be defined. | @ze_msg | If the JSON contains a field representing a typical "log event" <PREFIX INFORMATION> <EVENT TEXT>, then this Zebrium key should be set to the value of that "log event". Zebrium's machine learning with then structure this field into an Event Type (etype) used for Incident detection. | Required (if your log events are wrapped in JSON) |
@ze_sev | If @ze_msg does not contain a severity, then this field can be used to explicitly set the severity based on some other criteria or field from the payload. | Optional | ||
@ze_pfx | If @ze_msg will not contain prefix information like process name for example, then this field can be used to explicitly set additional prefix values based on some other criteria or field from the payload. | Optional | ||
Fully Structured JSON | If the events are fully structured JSON and are NOT simply Log Events Wrapped in JSON (as above) then these keys need to be defined. | @ze_etype | For Fully Structured JSON events, there is no log event that can be automatically structured into an Event Type (etype). Therefore, the etype must be explicitly defined. A value (or set of values concatenated together) that best represents the unique type of an event should be used from the structured payload. We would expect the cardinality of @ze_etype to be 100s or a few thousand at most. | Required (if your log events are Fully Structured JSON) |
External ID Mapping | Map events in Zebrium to corresponding events in Elasticsearch | @ze_xid | Assign a unique id (UUID) to every log event so that events in Zebrium can be mapped to corresponding events in Elasticsearch through a common UUID | Required (if using Kibana/Elasticsearch to view Zebrium Incidents) |
Metrics from metricbeat | Send metrics from metricbeat to Zebrium so metric anomalies can be used to augment Auto-Detetced Incidents from Log Events | @ze_stats | Map metrics from the metricbeat "system" key to the @ze_stats using the Logstash "copy" operation | Required for viewing metricbeat metrics in Zebrium |
Configuring Logstash Filters for Zebrium Required Fields (in Logstash)
-
Edit the appropriate Logstash configuration file to define the required ZELK Stack filter definitions. All of these definitions are within the
filter { }
section of the configuration. - TIME FIELDS
- @timestamp should contain the timestamp from the log event (not the timestamp when processed by Logstash). This is important for proper Incident detection in Zebrium.
- Processing multi-line events should be enabled such that child log event lines are concatenated to the parent event with newlines.
- The following shows an example configuration for meeting these requirements.
#----------------------------------------------------------------------# # Input Filter definition for processing multi-line events (if needed) # #----------------------------------------------------------------------# codec => multiline { pattern => "^%{TIMESTAMP_ISO8601}" negate => true what => "previous" } #------------------------------------------------------------------------------------------# # Grok and Date Filter for capturing log event timestamp in @timestamp # # If it is not possible to easily capture the event timestamp as @timestamp as shown here, # # it is OK to leave @timestamp as-is (i.e. use the logstash generated timestamp) # #------------------------------------------------------------------------------------------# grok { match => [ "message", "(?m)%{TIMESTAMP_ISO8601:logdate}" ] # Note the multi-line capture pattern (?m) } date { # This will set @timestamp match => [ "logdate", "yyyy-MM-dd HH:mm:ss,SSS", "yyyy-MM-dd HH:mm:ss" ] timezone => "America/Los_Angeles" remove_field => ["logdate"] } #---------------------------------------# # Capture @ze_timezone # # If not specified, UTC will be assumed # #---------------------------------------# mutate { add_field => { @ze_timezone => "America/Los_Angeles" } # Specify timezone (IANA TZ Names) if your log timestamps are missing the timezone info, otherwise UTC is assumed (optional). }
-
LOG GENERATOR FIELDS
#-----------------------------------------------------------------# # Mutate Filter for capturing logtype, host and gid # # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE # # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES # #-----------------------------------------------------------------# mutate { add_field => { "@ze_deployment_name" => "%{my_deployment}" } # assumes field "my_deployment" is part of the payload (recommended) add_field => { "@ze_host" => "%{host}" } # assumes field "host" is part of the payload (required) add_field => { "@ze_logtype" => "%{logtype}" } # assumes field "logtype" is part of the payload (required) add_field => { "@ze_gid" => "%{container_name}" } # assumes field "container_name" is part of the payload (optional) }
-
LOG EVENTS WRAPPED IN JSON FIELDS
- This configuration is required if you have a "message" field in the JSON containing an unstructured log event. In that case, we will structure the message and create an Event-Type automatically for Incident Detection.
#-----------------------------------------------------------------# # Required if your log events are wrapped in JSON # # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE # # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES # #-----------------------------------------------------------------# mutate { add_field => { "@ze_msg" => "%{message}" } # Capture the unstructured log event from the message field - Zebrium will automatically structure this into an etype (required) add_field => { "@ze_sev" => "%{[log][severity]}" } # Capture the severity explicitly since "message" field does not contain severity (optional) add_field => { "@ze_pfx" => "%{[log][process]}" } # Capture the process name and add to the log event prefix so its part of the automatic structuring (optional) }
-
FULLY STRUCTURED JSON FIELDS
- If the log events being sent to Zebrium are fully structured JSON (i.e. NOT simply syslog-style events wrapped in a JSON payload), an Event-Type definition for each "unique" log message type is required for proper incident detection.
- NOTE: This configuration is not required if you have a "message" field in the JSON containing an unstructured log event. In that case, we will structure the message and create an Event-Type automatically.
#-----------------------------------------------------------------# # Required if your log events are Fully Structured JSON # # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE # # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES # #-----------------------------------------------------------------# mutate { add_field => { "@ze_etype" => "%{some_user_defined_etype}" } # This would typically be a key (or concatenation of multiple keys) to formulate a name for this "type" of event. }
-
EXTERNAL ID MAPPING FIELD
- Note: This is not part of a mutate filter
uuid { target => "@ze_xid" # Generate a Unique ID and assign to @ze_xid }
- SAVE YOUR CONFIGURATION FILE
Configuring Log Event Output to Zebrium (in Logstash)
- Edit the appropriate Logstash configuration file to define the required ZELK Stack output definition.
-
Add the following Output Filter definition for Zebrium and substitute ZE_LOG_COLLECTOR_URL and ZE_LOG_COLLECTOR_TOKEN with the values from "Retrieve your Zebrium URL and Auth Token for Configuring the Logstash HTTP Output Plugin" Step 6 above.
output { if <SOME_CONDITION_IS_TRUE> { http { format => "json_batch" http_method => "post" url => "<ZE_LOG_COLLECTOR_URL>/log/api/v2/ingest?log_source=logstash&log_format=json_batch" headers => ["authtoken", "<ZE_LOG_COLLECTOR_TOKEN>"] } } }
- SAVE YOUR CONFIGURATION FILE
Reload Logstash Configuration
Reload your Logstash configuration (see here) to pickup all changes. Data will now be ingesting into Zebrium.
Complete example for filebeat, winlogbeat and metricbeat data
It is highly recommended you read this carefully and follow the sample provided
input {
beats {
port => 5044
}
}
filter {
#--------------------------------------------#
# Add the UUID to all events before #
# cloning a copy for the zebrium only fileds #
#--------------------------------------------#
uuid {
target => "@ze_xid" # Generate a Unique ID and assign to @ze_xid
}
#---------------------------------------------#
# Make a clone of the message so we only send #
# Zebrium add-ons to Zebrium and not to other #
# existing outputs like elastic #
#---------------------------------------------#
clone {
clones => ['zebrium']
}
#------------------------------------#
# Add Zebrium specifics to the clone #
#------------------------------------#
if( [type] == 'zebrium' ) {
#--------------------------------------------------------------#
# Common attributes across metricbeats, filebeats, winlogbeats #
#--------------------------------------------------------------#
mutate {
add_field => { "[@metadata][zebrium]" => true }
}
mutate {
add_field => { "@ze_deployment_name" => "mydeployment01" }
}
if( [host][hostname] ) {
mutate {
add_field => { "@ze_host" => "%{[host][hostname]}" }
}
} else if ( [host][name] ) {
mutate {
add_field => { "@ze_host" => "%{[host][name]}" }
}
}
if( [@ze_host] ) {
mutate {
gsub => [ "@ze_host", "^([^\.]+)", "\1" ] # Use hostname without fully qualified domain
}
} else {
mutate {
add_field => { "@ze_host" => "unknown" }
}
}
#------------------------------#
# metricbeat specific captures #
#------------------------------#
if( [agent][type] and [agent][type] == "metricbeat" ) {
if( [system] ) {
mutate {
copy => { "[system]" => "@ze_stats" } # Zebrium needs deep copy of metrics defined as @ze_stats
}
} else if( [error][message] ) {
# Metricbeat failed to collect data - capture the error as a regular log event
mutate {
add_field => { "@ze_msg" => "%{[error][message]}" }
add_field => { "@ze_time" => "%{@timestamp}" }
add_field => { "@ze_sev" => "ERROR" }
add_field => { "@ze_logtype" => "metricbeat" }
}
}
}
#------------------------------#
# winlogbeat specific captures #
#------------------------------#
if( [agent][type] and [agent][type] == "winlogbeat" ) {
if( [log][level] ) {
mutate {
add_field => { "@ze_sev" => "%{[log][level]}" }
}
}
if( [message] ) {
mutate {
add_field => { "@ze_msg" => "%{[message]}" }
add_field => { "@ze_time" => "%{@timestamp}" }
}
}
if( [event][provider] ) {
mutate {
add_field => { "@ze_logtype" => "%{[event][provider]}" }
}
} else if( [event][module] ) {
mutate {
add_field => { "@ze_logtype" => "%{[event][module]}" }
}
} else {
mutate {
add_field => { "@ze_logtype" => "winlogbeat" }
}
}
if [@ze_logtype] and [@ze_logtype] =~ "^Microsoft\-Windows\-" {
# Sometimes we see provider start with Microsoft-Windows-, so get rid the that extraneous string and pickup the reaminder as the logtype
mutate {
gsub => [ "@ze_logtype", "^Microsoft\-Windows\-(.*)$", "\1" ]
}
}
}
#----------------------------#
# filebeat specific captures #
#----------------------------#
if( [agent][type] and [agent][type] == "filebeat" ) {
if( [message] ) {
mutate {
add_field => { "@ze_msg" => "%{[message]}" }
}
}
if( [log][file][path] ) {
grok {
match => [ "[log][file][path]","%{GREEDYDATA}[\\/]%{GREEDYDATA:logtype}\.log" ]
}
mutate {
add_field => { "@ze_logtype" => "%{logtype}" }
remove_field => [ "logtype" ]
}
mutate {
# Sometimes the log filename starts with the hostname, remove that so all logs of the same type are grouped together
gsub => [ "@ze_logtype", "^%{@ze_host}([^\d]+).*$", "\1" ]
}
} else {
mutate {
add_field => { "@ze_logtype" => "filebeatlog" }
}
}
}
} # END OF ZEBRIUM
}
output {
# SEND ZEBRIUM DATA TO ZEBRIUM ONLY
if [@metadata][zebrium] {
http {
format => "json_batch"
http_method => "post"
url => "<url>"
headers => ["authtoken", "<token>"]
proxy => "<proxy>"
}
# THEN SEND DATA AS WAS DONE BEFORE ADDING ZEBRIUM
} else if [@metadata][pipeline] {
elasticsearch {
hosts => ["https://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
pipeline => "%{[@metadata][pipeline]}"
ssl => true
ssl_certificate_verification => true
cacert => '/etc/logstash/certs/ca.crt'
user => elastic
password => "${ES_PW}"
}
} else {
elasticsearch {
hosts => ["https://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
pipeline => beats
ssl => true
ssl_certificate_verification => true
cacert => '/etc/logstash/certs/ca.crt'
user => elastic
password => "${ES_PW}"
}
}
}