Link

Configuring Logstash to Send Log Data to Zebrium

In Zebrium

Retrieve your Zebrium URL and Auth Token for Configuring the Logstash HTTP Output Plugin

  1. Login to your Zebrium portal user account.
  2. If you have not yet ingested log event data into Zebrium, go to Step 5. Otherwise continue with Step 3.
  3. From the User menu area, select the Account Settings gear icon.
  4. Click the Log/Metrics Collector tab.
  5. Click the ZELK Stack Log Collector button.
  6. Note the ZE_LOG_COLLECTOR_URL and ZE_LOG_COLLECTOR_TOKEN values and store for later use when configuring Logstash below.

In Logstash

Zebrium Required Fields (in Logstash)

Zebrium requires certain fields (keys) be defined for each log event. These definitions are part of the "filter" section in the logstash configuration.

There are 5 types of Zebrium fields that require definition in the Logstash filter configuration for proper Incident detection in Zebrium. (Example Logstash configuration is shown below the table):

Type Description Key Name Key Definition Requirement
Time Timestamp/time zone of each log event. @timestamp Timestamp of each log event (rather than the time the event was processed by Logstash if possible). Required
@ze_timezone Time zone of each log event. E.g. "America/Los_Angeles" Optional
Note:UTC is the default
Log Generator Indicates the source of the log event. @ze_deployment_name Identifies the environment or deployment.
E.g. "production", "qa", "dev"
Recommended
@ze_host Host name identifier Required
@ze_logtype The basename of the log source. E.g. "access.log", "syslog".
In the Zebrium UI, it will be the logtype.
In the container world, this would probably be the app name.
Required
@ze_gid If there are multiple processes within the same host, you can specify a distinct Generator ID for those different processes.
In the container world, this would probably be a container name.
Optional
Log Events Wrapped in JSON If the application or host log events are simply wrapped in a JSON and contain a field like "message" : "2020-10-23 04:17:37 mars INFO systemd[1]: Stopped PostgreSQL RDBMS.", then these keys need to be defined. @ze_msg If the JSON contains a field representing a typical "log event"
<PREFIX INFORMATION> <EVENT TEXT>,
then this Zebrium key should be set to the value of that "log event". Zebrium's machine learning with then structure this field into an Event Type (etype) used for Incident detection.
Required (if your log events are wrapped in JSON)
@ze_sev If @ze_msg does not contain a severity, then this field can be used to explicitly set the severity based on some other criteria or field from the payload. Optional
@ze_pfx If @ze_msg will not contain prefix information like process name for example, then this field can be used to explicitly set additional prefix values based on some other criteria or field from the payload. Optional
Fully Structured JSON If the events are fully structured JSON and are NOT simply Log Events Wrapped in JSON (as above) then these keys need to be defined. @ze_etype For Fully Structured JSON events, there is no log event that can be automatically structured into an Event Type (etype). Therefore, the etype must be explicitly defined. A value (or set of values concatenated together) that best represents the unique type of an event should be used from the structured payload. We would expect the cardinality of @ze_etype to be 100s or a few thousand at most. Required (if your log events are Fully Structured JSON)
External ID Mapping Map events in Zebrium to corresponding events in Elasticsearch @ze_xid Assign a unique id (UUID) to every log event so that events in Zebrium can be mapped to corresponding events in Elasticsearch through a common UUID Required (if using Kibana/Elasticsearch to view Zebrium Incidents)

Configuring Logstash Filters for Zebrium Required Fields (in Logstash)

  1. Edit the appropriate Logstash configuration file to define the required ZELK Stack filter definitions. All of these definitions are within the filter { } section of the configuration.

  2. TIME FIELDS
    • @timestamp should contain the timestamp from the log event (not the timestamp when processed by Logstash). This is important for proper Incident detection in Zebrium.
    • Processing multi-line events should be enabled such that child log event lines are concatenated to the parent event with newlines.
    • The following shows an example configuration for meeting these requirements.
    #----------------------------------------------------------------------#
    # Input Filter definition for processing multi-line events (if needed) #
    #----------------------------------------------------------------------#
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601}"
      negate  => true
      what    => "previous"
    }
    
    #------------------------------------------------------------------------------------------#
    # Grok and Date Filter for capturing log event timestamp in @timestamp                     #
    # If it is not possible to easily capture the event timestamp as @timestamp as shown here, #
    # it is OK to leave @timestamp as-is (i.e. use the logstash generated timestamp)           # 
    #------------------------------------------------------------------------------------------#
    grok {
      match => [ "message", "(?m)%{TIMESTAMP_ISO8601:logdate}" ] # Note the multi-line capture pattern (?m)
    }
    date {
      # This will set @timestamp
      match        => [ "logdate", "yyyy-MM-dd HH:mm:ss,SSS", "yyyy-MM-dd HH:mm:ss" ]
      timezone     => "America/Los_Angeles"
      remove_field => ["logdate"]
    }
    
    #---------------------------------------#
    # Capture @ze_timezone                  #
    # If not specified, UTC will be assumed #
    #---------------------------------------#
    mutate {
        add_field => { @ze_timezone => "America/Los_Angeles" }  # Specify timezone (IANA TZ Names) if your log timestamps are missing the timezone info, otherwise UTC is assumed (optional).
    }
    
  3. LOG GENERATOR FIELDS

    #-----------------------------------------------------------------#
    # Mutate Filter for capturing logtype, host and gid               #
    # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE                 #
    # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES #
    #-----------------------------------------------------------------#
    mutate {
       add_field => { "@ze_deployment_name" => "%{my_deployment}"  } # assumes field "my_deployment"  is part of the payload (recommended)
       add_field => { "@ze_host"            => "%{host}"           } # assumes field "host"           is part of the payload (required)
       add_field => { "@ze_logtype"         => "%{logtype}"        } # assumes field "logtype"        is part of the payload (required)
       add_field => { "@ze_gid"             => "%{container_name}" } # assumes field "container_name" is part of the payload (optional)
    }
    
  4. LOG EVENTS WRAPPED IN JSON FIELDS

    • This configuration is required if you have a "message" field in the JSON containing an unstructured log event. In that case, we will structure the message and create an Event-Type automatically for Incident Detection.
    #-----------------------------------------------------------------#
    # Required if your log events are wrapped in JSON                 #
    # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE                 #
    # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES #
    #-----------------------------------------------------------------#
    mutate {
        add_field => { "@ze_msg"  => "%{message}"         } # Capture the unstructured log event from the message field - Zebrium will automatically structure this into an etype (required)
        add_field => { "@ze_sev"  => "%{[log][severity]}" } # Capture the severity explicitly since "message" field does not contain severity (optional)
        add_field => { "@ze_pfx"  => "%{[log][process]}"  } # Capture the process name and add to the log event prefix so its part of the automatic structuring (optional)
    }
    
  5. FULLY STRUCTURED JSON FIELDS

    • If the log events being sent to Zebrium are fully structured JSON (i.e. NOT simply syslog-style events wrapped in a JSON payload), an Event-Type definition for each "unique" log message type is required for proper incident detection.
    • NOTE: This configuration is not required if you have a "message" field in the JSON containing an unstructured log event. In that case, we will structure the message and create an Event-Type automatically.
    #-----------------------------------------------------------------#
    # Required if your log events are Fully Structured JSON           #
    # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE                 #
    # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES #
    #-----------------------------------------------------------------#
    mutate {
       add_field => { "@ze_etype" => "%{some_user_defined_etype}" }  # This would typically be a key (or concatenation of multiple keys) to formulate a name for this "type" of event.
    }
    
  6. EXTERNAL ID MAPPING FIELD

    • Note: This is not part of a mutate filter
    uuid {
      target => "@ze_xid"  # Generate a Unique ID and assign to @ze_xid 
    }
    
  7. SAVE YOUR CONFIGURATION FILE

Configuring Log Event Output to Zebrium (in Logstash)

  1. Edit the appropriate Logstash configuration file to define the required ZELK Stack output definition.
  2. Add the following Output Filter definition for Zebrium and substitute ZE_LOG_COLLECTOR_URL and ZE_LOG_COLLECTOR_TOKEN with the values from "Retrieve your Zebrium URL and Auth Token for Configuring the Logstash HTTP Output Plugin" Step 6 above.

    output {
      if <SOME_CONDITION_IS_TRUE> {
        http {
          format      => "json_batch"
          http_method => "post"
          url         => "<ZE_LOG_COLLECTOR_URL>/log/api/v2/ingest?log_source=logstash&log_format=json_batch"
          headers     => ["authtoken", "<ZE_LOG_COLLECTOR_TOKEN>"]
        }
      }
    }
    
  3. SAVE YOUR CONFIGURATION FILE

Reload Logstash Configuration

Reload your Logstash configuration (see here) to pickup all changes. Data will now be ingesting into Zebrium.

Complete example for filebeat, winlogbeat and metricbeat data

It is highly recommended you read this carefully and follow the sample provided

input {
  beats {
    port => 5044
  }
}
 
filter {
 
  #--------------------------------------------#
  # Add the UUID to all events before          #
  # cloning a copy for the zebrium only fileds #
  #--------------------------------------------#
  uuid {
    target => "@ze_xid"  # Generate a Unique ID and assign to @ze_xid
  }
 
  #---------------------------------------------#
  # Make a clone of the message so we only send #
  # Zebrium add-ons to Zebrium and not to other #
  # existing outputs like elastic               #
  #---------------------------------------------#
  clone {
    clones => ['zebrium']
  }
 
  #------------------------------------#
  # Add Zebrium specifics to the clone #
  #------------------------------------#
  if( [type] == 'zebrium' ) {
    #--------------------------------------------------------------#
    # Common attributes across metricbeats, filebeats, winlogbeats #
    #--------------------------------------------------------------#
    mutate {
      add_field => { "[@metadata][zebrium]" => true }
    }
    mutate {
      add_field => { "@ze_deployment_name" => "mydeployment01"  }
    }
    if( [host][hostname] ) {
      mutate {
        add_field => { "@ze_host" => "%{[host][hostname]}" }
      }
    } else if ( [host][name] ) {
      mutate {
        add_field => { "@ze_host" => "%{[host][name]}" }
      }
    }
    if( [@ze_host] ) {
      mutate {
        gsub => [ "@ze_host", "^([^\.]+)", "\1" ] # Use hostname without fully qualified domain
      }
    } else {
      mutate {
        add_field => { "@ze_host" => "unknown" }
      }
    }
 
    #------------------------------#
    # metricbeat specific captures #
    #------------------------------#
    if( [agent][type] and [agent][type] == "metricbeat" ) {
      if( [system] ) {
        mutate {
          copy => { "[system]" => "@ze_stats" } # Zebrium needs deep copy of metrics defined as @ze_stats
        }
      } else if( [error][message] ) {
        # Metricbeat failed to collect data - capture the error as a regular log event
        mutate {
          add_field => { "@ze_msg"     => "%{[error][message]}" }
          add_field => { "@ze_time"    => "%{@timestamp}" }
          add_field => { "@ze_sev"     => "ERROR" }
          add_field => { "@ze_logtype" => "metricbeat" }
        }
      }
    }
 
    #------------------------------#
    # winlogbeat specific captures #
    #------------------------------#
    if( [agent][type] and [agent][type] == "winlogbeat" ) {
      if( [log][level] ) {
        mutate {
          add_field => { "@ze_sev" => "%{[log][level]}" }
        }
      }
      if( [message] ) {
        mutate {
          add_field => { "@ze_msg"  => "%{[message]}"  }
          add_field => { "@ze_time" => "%{@timestamp}" }
        }
      }
      if( [event][provider] ) {
        mutate {
          add_field => { "@ze_logtype" => "%{[event][provider]}" }
        }
      } else if( [event][module] ) {
        mutate {
          add_field => { "@ze_logtype" => "%{[event][module]}" }
        }
      } else {
        mutate {
          add_field => { "@ze_logtype" => "winlogbeat" }
        }
      }
      if [@ze_logtype] and [@ze_logtype] =~ "^Microsoft\-Windows\-" {
        # Sometimes we see provider start with Microsoft-Windows-, so get rid the that extraneous string and pickup the reaminder as the logtype
        mutate {
          gsub => [ "@ze_logtype", "^Microsoft\-Windows\-(.*)$", "\1" ]
        }
      }
    }
    #----------------------------#
    # filebeat specific captures #
    #----------------------------#
    if( [agent][type] and [agent][type] == "filebeat" ) {
      if( [message] ) {
        mutate {
          add_field => { "@ze_msg" => "%{[message]}" }
        }
      }
      if( [log][file][path] ) {
        grok {
          match => [ "[log][file][path]","%{GREEDYDATA}[\\/]%{GREEDYDATA:logtype}\.log" ]
        }
        mutate {
          add_field    => { "@ze_logtype" => "%{logtype}" }
          remove_field => [ "logtype" ]
        }
        mutate {
          # Sometimes the log filename starts with the hostname, remove that so all logs of the same type are grouped together
          gsub => [ "@ze_logtype", "^%{@ze_host}([^\d]+).*$", "\1" ]
        }
      } else {
        mutate {
          add_field => { "@ze_logtype" => "filebeatlog" }
        }
      }
    }
  } # END OF ZEBRIUM
}
 
output {
  # SEND ZEBRIUM DATA TO ZEBRIUM ONLY
  if [@metadata][zebrium] {
    http {
        format      => "json_batch"
        http_method => "post"
        url         => "<url>"
        headers     => ["authtoken", "<token>"]
        proxy       => "<proxy>"
    }
  # THEN SEND DATA AS WAS DONE BEFORE ADDING ZEBRIUM
  } else if [@metadata][pipeline] {
    elasticsearch {
        hosts => ["https://localhost:9200"]
        index => "%{[@metadata][beat]}-%{[@metadata][version]}"
        pipeline => "%{[@metadata][pipeline]}"
        ssl => true
        ssl_certificate_verification => true
        cacert => '/etc/logstash/certs/ca.crt'
        user => elastic
        password => "${ES_PW}"
    }
  } else {
    elasticsearch {
        hosts => ["https://localhost:9200"]
        index => "%{[@metadata][beat]}-%{[@metadata][version]}"
        pipeline => beats
        ssl => true
        ssl_certificate_verification => true
        cacert => '/etc/logstash/certs/ca.crt'
        user => elastic
        password => "${ES_PW}"
    }
  }
}