Configuring Logstash to Send Log Data to Zebrium
In Zebrium
Retrieve your Zebrium URL and Auth Token for Configuring the Logstash HTTP Output Plugin
- Login to your Zebrium portal user account.
- From the User menu area in Zebrium, click on the Settings (hamburger) Menu.
- Select Integrations and Collectors.
- Click on Other.
- Note the ZE_LOG_COLLECTOR_URL and ZE_LOG_COLLECTOR_TOKEN values and store for later use when configuring Logstash below.
In Logstash
Zebrium Required Fields (in Logstash)
Zebrium requires certain fields (keys) be defined for each log event. These definitions are part of the "filter" section in the logstash configuration.
There are 4 types of Zebrium fields that require definition in the Logstash filter configuration for proper Incident detection in Zebrium. (Example Logstash configuration is shown below the table):
Type | Description | Key Name | Key Definition | Requirement |
---|---|---|---|---|
Time | Timestamp/time zone of each log event. | @timestamp | Timestamp of each log event (rather than the time the event was processed by Logstash if possible). | Required |
@ze_timezone | Time zone of each log event. E.g. "America/Los_Angeles" | Optional Note:UTC is the default | ||
Log Generator | Indicates the source of the log event. | @ze_deployment_name | Identifies the environment or application domain. In the Zebrium UI this is known as the Service Group (see Note on Service Groups below) E.g. "production", "dev", "acme_calendar_app" | Recommended |
@ze_host | Host name identifier | Required | ||
@ze_logtype | The basename of the log source. E.g. "access.log", "syslog". In the Zebrium UI, it will be the logtype. In the container world, this would probably be the app name. | Required | ||
Log Events Wrapped in JSON | If the application or host log events are simply wrapped in a JSON and contain a field like "message" : "2020-10-23 04:17:37 mars INFO systemd[1]: Stopped PostgreSQL RDBMS.", then these keys need to be defined. | @ze_msg | If the JSON contains a field representing a typical "log event" <PREFIX INFORMATION> <EVENT TEXT>, then this Zebrium key should be set to the value of that "log event". Zebrium's machine learning with then structure this field into an Event Type (etype) used for Incident detection. | Required (if your log events are wrapped in JSON) |
@ze_sev | If @ze_msg does not contain a severity, then this field can be used to explicitly set the severity based on some other criteria or field from the payload. | Optional | ||
External ID Mapping | Map events in Zebrium to corresponding events in Elasticsearch | @ze_xid | Assign a unique id (UUID) to every log event so that events in Zebrium can be mapped to corresponding events in Elasticsearch through a common UUID | Required (if using Kibana/Elasticsearch to view Zebrium Incidents) |
Service Groups
A Service Group defines a failure domain boundary for anomaly correlation. This allows you to collect logs from multiple applications and isolate the logs of one from another so as not to mix these in a Root Cause Report. This is referred to as a Service Group in the Zebrium UI. If you’re uploading multiple logs from different services in the same application, you would specify the same service group for each log event from that application. For example, let’s say you have a database log, and application log and a middleware log for the Acme Calendar application. You would use an appropriate service group when uploading all files from that application for example acme_calendar_app.
Configuring Logstash Filters for Zebrium Required Fields (in Logstash)
-
Edit the appropriate Logstash configuration file to define the required Zebrium with Elastic Stack filter definitions. All of these definitions are within the
filter { }
section of the configuration. - TIME FIELDS
- @timestamp should contain the timestamp from the log event (not the timestamp when processed by Logstash). This is important for proper Incident detection in Zebrium.
- Processing multi-line events should be enabled such that child log event lines are concatenated to the parent event with newlines.
- The following shows an example configuration for meeting these requirements.
#----------------------------------------------------------------------# # Input Filter definition for processing multi-line events (if needed) # #----------------------------------------------------------------------# codec => multiline { pattern => "^%{TIMESTAMP_ISO8601}" negate => true what => "previous" } #------------------------------------------------------------------------------------------# # Grok and Date Filter for capturing log event timestamp in @timestamp # # If it is not possible to easily capture the event timestamp as @timestamp as shown here, # # it is OK to leave @timestamp as-is (i.e. use the logstash generated timestamp) # #------------------------------------------------------------------------------------------# grok { match => [ "message", "(?m)%{TIMESTAMP_ISO8601:logdate}" ] # Note the multi-line capture pattern (?m) } date { # This will set @timestamp match => [ "logdate", "yyyy-MM-dd HH:mm:ss,SSS", "yyyy-MM-dd HH:mm:ss" ] timezone => "America/Los_Angeles" remove_field => ["logdate"] } #---------------------------------------# # Capture @ze_timezone # # If not specified, UTC will be assumed # #---------------------------------------# mutate { add_field => { @ze_timezone => "America/Los_Angeles" } # Specify timezone (IANA TZ Names) if your log timestamps are missing the timezone info, otherwise UTC is assumed (optional). }
-
LOG GENERATOR FIELDS
#-----------------------------------------------------------------# # Mutate Filter for capturing logtype, host and gid # # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE # # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES # #-----------------------------------------------------------------# mutate { add_field => { "@ze_deployment_name" => "%{my_deployment}" } # assumes field "my_deployment" is part of the payload (recommended) add_field => { "@ze_host" => "%{host}" } # assumes field "host" is part of the payload (required) add_field => { "@ze_logtype" => "%{logtype}" } # assumes field "logtype" is part of the payload (required) }
-
LOG EVENTS WRAPPED IN JSON FIELDS
- This configuration is required if you have a "message" field in the JSON containing an unstructured log event. In that case, we will structure the message and create an Event-Type automatically for Incident Detection.
#-----------------------------------------------------------------# # Required if your log events are wrapped in JSON # # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE # # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES # #-----------------------------------------------------------------# mutate { add_field => { "@ze_msg" => "%{message}" } # Capture the unstructured log event from the message field - Zebrium will automatically structure this into an etype (required) add_field => { "@ze_sev" => "%{[log][severity]}" } # Capture the severity explicitly since "message" field does not contain severity (optional) add_field => { "@ze_pfx" => "%{[log][process]}" } # Capture the process name and add to the log event prefix so its part of the automatic structuring (optional) }
-
EXTERNAL ID MAPPING FIELD
- Note: This is not part of a mutate filter
uuid { target => "@ze_xid" # Generate a Unique ID and assign to @ze_xid }
- SAVE YOUR CONFIGURATION FILE
Configuring Log Event Output to Zebrium (in Logstash)
- Edit the appropriate Logstash configuration file to define the required Zebrium with Elastic Stack output definition.
-
Add the following Output Filter definition for Zebrium and substitute ZE_LOG_COLLECTOR_URL and ZE_LOG_COLLECTOR_TOKEN with the values from "Retrieve your Zebrium URL and Auth Token for Configuring the Logstash HTTP Output Plugin" Step 6 above.
output { if <SOME_CONDITION_IS_TRUE> { http { format => "json_batch" http_method => "post" url => "<ZE_LOG_COLLECTOR_URL>/log/api/v2/ingest?log_source=logstash&log_format=json_batch" headers => ["authtoken", "<ZE_LOG_COLLECTOR_TOKEN>"] } } }
- SAVE YOUR CONFIGURATION FILE
Reload Logstash Configuration
Reload your Logstash configuration (see here) to pickup all changes. Data will now be ingesting into Zebrium.
Complete example for filebeat and winlogbeat data
It is highly recommended you read this carefully and follow the sample provided
input {
beats {
port => 5044
}
}
filter {
#--------------------------------------------#
# Add the UUID to all events before #
# cloning a copy for the zebrium only fields #
#--------------------------------------------#
uuid {
target => "@ze_xid" # Generate a Unique ID and assign to @ze_xid
}
#---------------------------------------------#
# Make a clone of the message so we only send #
# Zebrium add-ons to Zebrium and not to other #
# existing outputs like elastic #
#---------------------------------------------#
clone {
clones => ['zebrium']
}
#------------------------------------#
# Add Zebrium specifics to the clone #
#------------------------------------#
if( [type] == 'zebrium' ) {
#--------------------------------------------------------------#
# Common attributes across filebeats, winlogbeats #
#--------------------------------------------------------------#
mutate {
add_field => { "[@metadata][zebrium]" => true }
}
mutate {
add_field => { "@ze_deployment_name" => "mydeployment01" }
}
if( [host][hostname] ) {
mutate {
add_field => { "@ze_host" => "%{[host][hostname]}" }
}
} else if ( [host][name] ) {
mutate {
add_field => { "@ze_host" => "%{[host][name]}" }
}
}
if( [@ze_host] ) {
mutate {
gsub => [ "@ze_host", "^([^\.]+)", "\1" ] # Use hostname without fully qualified domain
}
} else {
mutate {
add_field => { "@ze_host" => "unknown" }
}
}
#------------------------------#
# winlogbeat specific captures #
#------------------------------#
if( [agent][type] and [agent][type] == "winlogbeat" ) {
if( [log][level] ) {
mutate {
add_field => { "@ze_sev" => "%{[log][level]}" }
}
}
if( [message] ) {
mutate {
add_field => { "@ze_msg" => "%{[message]}" }
add_field => { "@ze_time" => "%{@timestamp}" }
}
}
if( [event][provider] ) {
mutate {
add_field => { "@ze_logtype" => "%{[event][provider]}" }
}
} else if( [event][module] ) {
mutate {
add_field => { "@ze_logtype" => "%{[event][module]}" }
}
} else {
mutate {
add_field => { "@ze_logtype" => "winlogbeat" }
}
}
if [@ze_logtype] and [@ze_logtype] =~ "^Microsoft\-Windows\-" {
# Sometimes we see provider start with Microsoft-Windows-, so get rid the that extraneous string and pickup the reaminder as the logtype
mutate {
gsub => [ "@ze_logtype", "^Microsoft\-Windows\-(.*)$", "\1" ]
}
}
}
#----------------------------#
# filebeat specific captures #
#----------------------------#
if( [agent][type] and [agent][type] == "filebeat" ) {
if( [message] ) {
mutate {
add_field => { "@ze_msg" => "%{[message]}" }
}
}
if( [log][file][path] ) {
grok {
match => [ "[log][file][path]","%{GREEDYDATA}[\\/]%{GREEDYDATA:logtype}\.log" ]
}
mutate {
add_field => { "@ze_logtype" => "%{logtype}" }
remove_field => [ "logtype" ]
}
mutate {
# Sometimes the log filename starts with the hostname, remove that so all logs of the same type are grouped together
gsub => [ "@ze_logtype", "^%{@ze_host}([^\d]+).*$", "\1" ]
}
} else {
mutate {
add_field => { "@ze_logtype" => "filebeatlog" }
}
}
}
} # END OF ZEBRIUM
}
output {
# SEND ZEBRIUM DATA TO ZEBRIUM ONLY
if [@metadata][zebrium] {
http {
format => "json_batch"
http_method => "post"
url => "<ZE_LOG_COLLECTOR_URL>/log/api/v2/ingest?log_source=logstash&log_format=json_batch"
headers => ["authtoken", "<ZE_LOG_COLLECTOR_TOKEN>"]
proxy => "<proxy>"
}
# THEN SEND DATA AS WAS DONE BEFORE ADDING ZEBRIUM
} else if [@metadata][pipeline] {
elasticsearch {
hosts => ["https://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
pipeline => "%{[@metadata][pipeline]}"
ssl => true
ssl_certificate_verification => true
cacert => '/etc/logstash/certs/ca.crt'
user => elastic
password => "${ES_PW}"
}
} else {
elasticsearch {
hosts => ["https://localhost:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}"
pipeline => beats
ssl => true
ssl_certificate_verification => true
cacert => '/etc/logstash/certs/ca.crt'
user => elastic
password => "${ES_PW}"
}
}
}