Link

Configuring Logstash to Send Log Data to Zebrium

In Zebrium

Retrieve your Zebrium URL and Auth Token for Configuring the Logstash HTTP Output Plugin

  1. Login to your Zebrium portal user account.
  2. From the User menu area in Zebrium, click on the Settings (hamburger) Menu.
  3. Select Integrations and Collectors.
  4. Click on Other.
  5. Note the ZE_LOG_COLLECTOR_URL and ZE_LOG_COLLECTOR_TOKEN values and store for later use when configuring Logstash below.

In Logstash

Zebrium Required Fields (in Logstash)

Zebrium requires certain fields (keys) be defined for each log event. These definitions are part of the "filter" section in the logstash configuration.

There are 4 types of Zebrium fields that require definition in the Logstash filter configuration for proper Incident detection in Zebrium. (Example Logstash configuration is shown below the table):

Type Description Key Name Key Definition Requirement
Time Timestamp/time zone of each log event. @timestamp Timestamp of each log event (rather than the time the event was processed by Logstash if possible). Required
@ze_timezone Time zone of each log event. E.g. "America/Los_Angeles" Optional
Note:UTC is the default
Log Generator Indicates the source of the log event. @ze_deployment_name Identifies the environment or application domain. In the Zebrium UI this is known as the Service Group (see Note on Service Groups below)
E.g. "production", "dev", "acme_calendar_app"
Recommended
@ze_host Host name identifier Required
@ze_logtype The basename of the log source. E.g. "access.log", "syslog".
In the Zebrium UI, it will be the logtype.
In the container world, this would probably be the app name.
Required
Log Events Wrapped in JSON If the application or host log events are simply wrapped in a JSON and contain a field like "message" : "2020-10-23 04:17:37 mars INFO systemd[1]: Stopped PostgreSQL RDBMS.", then these keys need to be defined. @ze_msg If the JSON contains a field representing a typical "log event"
<PREFIX INFORMATION> <EVENT TEXT>,
then this Zebrium key should be set to the value of that "log event". Zebrium's machine learning with then structure this field into an Event Type (etype) used for Incident detection.
Required (if your log events are wrapped in JSON)
@ze_sev If @ze_msg does not contain a severity, then this field can be used to explicitly set the severity based on some other criteria or field from the payload. Optional
External ID Mapping Map events in Zebrium to corresponding events in Elasticsearch @ze_xid Assign a unique id (UUID) to every log event so that events in Zebrium can be mapped to corresponding events in Elasticsearch through a common UUID Required (if using Kibana/Elasticsearch to view Zebrium Incidents)

Service Groups

A Service Group defines a failure domain boundary for anomaly correlation. This allows you to collect logs from multiple applications and isolate the logs of one from another so as not to mix these in a Root Cause Report. This is referred to as a Service Group in the Zebrium UI. If you’re uploading multiple logs from different services in the same application, you would specify the same service group for each log event from that application. For example, let’s say you have a database log, and application log and a middleware log for the Acme Calendar application. You would use an appropriate service group when uploading all files from that application for example acme_calendar_app.

Configuring Logstash Filters for Zebrium Required Fields (in Logstash)

  1. Edit the appropriate Logstash configuration file to define the required Zebrium with Elastic Stack filter definitions. All of these definitions are within the filter { } section of the configuration.

  2. TIME FIELDS
    • @timestamp should contain the timestamp from the log event (not the timestamp when processed by Logstash). This is important for proper Incident detection in Zebrium.
    • Processing multi-line events should be enabled such that child log event lines are concatenated to the parent event with newlines.
    • The following shows an example configuration for meeting these requirements.
    #----------------------------------------------------------------------#
    # Input Filter definition for processing multi-line events (if needed) #
    #----------------------------------------------------------------------#
    codec => multiline {
      pattern => "^%{TIMESTAMP_ISO8601}"
      negate  => true
      what    => "previous"
    }
    
    #------------------------------------------------------------------------------------------#
    # Grok and Date Filter for capturing log event timestamp in @timestamp                     #
    # If it is not possible to easily capture the event timestamp as @timestamp as shown here, #
    # it is OK to leave @timestamp as-is (i.e. use the logstash generated timestamp)           # 
    #------------------------------------------------------------------------------------------#
    grok {
      match => [ "message", "(?m)%{TIMESTAMP_ISO8601:logdate}" ] # Note the multi-line capture pattern (?m)
    }
    date {
      # This will set @timestamp
      match        => [ "logdate", "yyyy-MM-dd HH:mm:ss,SSS", "yyyy-MM-dd HH:mm:ss" ]
      timezone     => "America/Los_Angeles"
      remove_field => ["logdate"]
    }
    
    #---------------------------------------#
    # Capture @ze_timezone                  #
    # If not specified, UTC will be assumed #
    #---------------------------------------#
    mutate {
        add_field => { @ze_timezone => "America/Los_Angeles" }  # Specify timezone (IANA TZ Names) if your log timestamps are missing the timezone info, otherwise UTC is assumed (optional).
    }
    
  3. LOG GENERATOR FIELDS

    #-----------------------------------------------------------------#
    # Mutate Filter for capturing logtype, host and gid               #
    # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE                 #
    # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES #
    #-----------------------------------------------------------------#
    mutate {
       add_field => { "@ze_deployment_name" => "%{my_deployment}"  } # assumes field "my_deployment"  is part of the payload (recommended)
       add_field => { "@ze_host"            => "%{host}"           } # assumes field "host"           is part of the payload (required)
       add_field => { "@ze_logtype"         => "%{logtype}"        } # assumes field "logtype"        is part of the payload (required)
    }
    
  4. LOG EVENTS WRAPPED IN JSON FIELDS

    • This configuration is required if you have a "message" field in the JSON containing an unstructured log event. In that case, we will structure the message and create an Event-Type automatically for Incident Detection.
    #-----------------------------------------------------------------#
    # Required if your log events are wrapped in JSON                 #
    # PLEASE READ CAREFULLY - YOU MUST SUBSTITUTE THE                 #
    # RIGHT-HAND SIDE OF THE ASSIGNMENTS WITH YOUR FIELD NAMES/VALUES #
    #-----------------------------------------------------------------#
    mutate {
        add_field => { "@ze_msg"  => "%{message}"         } # Capture the unstructured log event from the message field - Zebrium will automatically structure this into an etype (required)
        add_field => { "@ze_sev"  => "%{[log][severity]}" } # Capture the severity explicitly since "message" field does not contain severity (optional)
        add_field => { "@ze_pfx"  => "%{[log][process]}"  } # Capture the process name and add to the log event prefix so its part of the automatic structuring (optional)
    }
    
  5. EXTERNAL ID MAPPING FIELD

    • Note: This is not part of a mutate filter
    uuid {
      target => "@ze_xid"  # Generate a Unique ID and assign to @ze_xid 
    }
    
  6. SAVE YOUR CONFIGURATION FILE

Configuring Log Event Output to Zebrium (in Logstash)

  1. Edit the appropriate Logstash configuration file to define the required Zebrium with Elastic Stack output definition.
  2. Add the following Output Filter definition for Zebrium and substitute ZE_LOG_COLLECTOR_URL and ZE_LOG_COLLECTOR_TOKEN with the values from "Retrieve your Zebrium URL and Auth Token for Configuring the Logstash HTTP Output Plugin" Step 6 above.

    output {
      if <SOME_CONDITION_IS_TRUE> {
        http {
          format      => "json_batch"
          http_method => "post"
          url         => "<ZE_LOG_COLLECTOR_URL>/log/api/v2/ingest?log_source=logstash&log_format=json_batch"
          headers     => ["authtoken", "<ZE_LOG_COLLECTOR_TOKEN>"]
        }
      }
    }
    
  3. SAVE YOUR CONFIGURATION FILE

Reload Logstash Configuration

Reload your Logstash configuration (see here) to pickup all changes. Data will now be ingesting into Zebrium.

Complete example for filebeat and winlogbeat data

It is highly recommended you read this carefully and follow the sample provided

input {
  beats {
    port => 5044
  }
}
 
filter {
 
  #--------------------------------------------#
  # Add the UUID to all events before          #
  # cloning a copy for the zebrium only fields #
  #--------------------------------------------#
  uuid {
    target => "@ze_xid"  # Generate a Unique ID and assign to @ze_xid
  }
 
  #---------------------------------------------#
  # Make a clone of the message so we only send #
  # Zebrium add-ons to Zebrium and not to other #
  # existing outputs like elastic               #
  #---------------------------------------------#
  clone {
    clones => ['zebrium']
  }
 
  #------------------------------------#
  # Add Zebrium specifics to the clone #
  #------------------------------------#
  if( [type] == 'zebrium' ) {
    #--------------------------------------------------------------#
    # Common attributes across filebeats, winlogbeats #
    #--------------------------------------------------------------#
    mutate {
      add_field => { "[@metadata][zebrium]" => true }
    }
    mutate {
      add_field => { "@ze_deployment_name" => "mydeployment01"  }
    }
    if( [host][hostname] ) {
      mutate {
        add_field => { "@ze_host" => "%{[host][hostname]}" }
      }
    } else if ( [host][name] ) {
      mutate {
        add_field => { "@ze_host" => "%{[host][name]}" }
      }
    }
    if( [@ze_host] ) {
      mutate {
        gsub => [ "@ze_host", "^([^\.]+)", "\1" ] # Use hostname without fully qualified domain
      }
    } else {
      mutate {
        add_field => { "@ze_host" => "unknown" }
      }
    }
 
    #------------------------------#
    # winlogbeat specific captures #
    #------------------------------#
    if( [agent][type] and [agent][type] == "winlogbeat" ) {
      if( [log][level] ) {
        mutate {
          add_field => { "@ze_sev" => "%{[log][level]}" }
        }
      }
      if( [message] ) {
        mutate {
          add_field => { "@ze_msg"  => "%{[message]}"  }
          add_field => { "@ze_time" => "%{@timestamp}" }
        }
      }
      if( [event][provider] ) {
        mutate {
          add_field => { "@ze_logtype" => "%{[event][provider]}" }
        }
      } else if( [event][module] ) {
        mutate {
          add_field => { "@ze_logtype" => "%{[event][module]}" }
        }
      } else {
        mutate {
          add_field => { "@ze_logtype" => "winlogbeat" }
        }
      }
      if [@ze_logtype] and [@ze_logtype] =~ "^Microsoft\-Windows\-" {
        # Sometimes we see provider start with Microsoft-Windows-, so get rid the that extraneous string and pickup the reaminder as the logtype
        mutate {
          gsub => [ "@ze_logtype", "^Microsoft\-Windows\-(.*)$", "\1" ]
        }
      }
    }
    #----------------------------#
    # filebeat specific captures #
    #----------------------------#
    if( [agent][type] and [agent][type] == "filebeat" ) {
      if( [message] ) {
        mutate {
          add_field => { "@ze_msg" => "%{[message]}" }
        }
      }
      if( [log][file][path] ) {
        grok {
          match => [ "[log][file][path]","%{GREEDYDATA}[\\/]%{GREEDYDATA:logtype}\.log" ]
        }
        mutate {
          add_field    => { "@ze_logtype" => "%{logtype}" }
          remove_field => [ "logtype" ]
        }
        mutate {
          # Sometimes the log filename starts with the hostname, remove that so all logs of the same type are grouped together
          gsub => [ "@ze_logtype", "^%{@ze_host}([^\d]+).*$", "\1" ]
        }
      } else {
        mutate {
          add_field => { "@ze_logtype" => "filebeatlog" }
        }
      }
    }
  } # END OF ZEBRIUM
}
 
output {
  # SEND ZEBRIUM DATA TO ZEBRIUM ONLY
  if [@metadata][zebrium] {
    http {
        format      => "json_batch"
        http_method => "post"
        url         => "<ZE_LOG_COLLECTOR_URL>/log/api/v2/ingest?log_source=logstash&log_format=json_batch"
        headers     => ["authtoken", "<ZE_LOG_COLLECTOR_TOKEN>"]
        proxy       => "<proxy>"
    }
  # THEN SEND DATA AS WAS DONE BEFORE ADDING ZEBRIUM
  } else if [@metadata][pipeline] {
    elasticsearch {
        hosts => ["https://localhost:9200"]
        index => "%{[@metadata][beat]}-%{[@metadata][version]}"
        pipeline => "%{[@metadata][pipeline]}"
        ssl => true
        ssl_certificate_verification => true
        cacert => '/etc/logstash/certs/ca.crt'
        user => elastic
        password => "${ES_PW}"
    }
  } else {
    elasticsearch {
        hosts => ["https://localhost:9200"]
        index => "%{[@metadata][beat]}-%{[@metadata][version]}"
        pipeline => beats
        ssl => true
        ssl_certificate_verification => true
        cacert => '/etc/logstash/certs/ca.crt'
        user => elastic
        password => "${ES_PW}"
    }
  }
}