I installed Filebeat 5.0 on my app server and have 3 Filebeat prospectors, each of the prospector are pointing to different log paths and output to one kafka topic called myapp_applog and everything works fine.
My Filebeat output configuration to one topic - Working
output.kafka:
    # initial brokers for reading cluster metadata
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
    # message topic selection + partitioning
    topic: 'myapp_applog'
    partition.round_robin:
      reachable_only: false
    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000
What I want to do is send each of the log files to seperate topics based on a condition see documentation section on topics. I have tried to do it but no data is been sent to any of the topics. Does anyone know why my condition does not match or it is correct. I can seem to find an example on how to correctly use the "topics topic condition".
Here is my kafka output to muliple topics configuration.
Not Working
output.kafka:
    # initial brokers for reading cluster metadata
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
    # message topic selection + partitioning
    topics:
      - topic: 'myapp_applog'
        when: 
          equals:
            document_type: applog_myappapi
      - topic: 'myapp_applog_stats'
        when:
          equals:
            document_type: applog_myappapi_stats
      - topic: 'myapp_elblog'
        when:
          equals:
            document_type: elblog_myappapi
    partition.round_robin:
      reachable_only: false
    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000
Here is full filebeat.yml configuration file.
################### Filebeat Configuration Example #########################
############################# Filebeat ######################################
filebeat.prospectors:
    # App logs - prospector
    - input_type: log
      paths:
        - /myapp/logs/myapp.log
      exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
      exclude_files: [".gz$", ".tmp"]
      fields:
        api: myappapi
        environment: STG
      ignore_older: 24h
      document_type: applog_myappapi
      scan_frequency: 1s
      # Multine on Timestamp, YYYY-MM-DD
      # https://www.elastic.co/guide/en/beats/filebeat/master/multiline-examples.html 
      multiline:
        pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
        negate: true
        match: after
        max_lines: 500
        timeout: 5s
    # Server Stats - prospector
    - input_type: log
      paths:
        - /myapp/logs/serverstats.log
      # Exclude messages with log level
      exclude_lines: [".+? ERROR[^*].+", ".+? DEBUG[^*].+"]
      exclude_files: [".gz$", ".tmp"]
      fields:
        api: myappapi
        environment: STG
      ignore_older: 24h
      document_type: applog_myappapi_stats
      scan_frequency: 1s
    # ELB prospector
    -
      input_type: log
      paths:
        - /var/log/httpd/elasticbeanstalk-access_log
      document_type: elblog_myappapi
      fields:
        api: myappapi
        environment: STG
      exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
      exclude_files: [".gz$", ".tmp"]
      ignore_older: 24h
      # 0s, it is done as often as possible. Default: 10s
      scan_frequency: 1s
registry_file: /var/lib/filebeat/registry
############################# Output ##########################################
# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#----------------------------- Kafka output --------------------------------
output.kafka:
    # initial brokers for reading cluster metadata
    hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]
    # message topic selection + partitioning
    topics:
      - topic: 'myapp_applog'
        when: 
          equals:
            document_type: applog_myappapi
      - topic: 'myapp_applog_stats'
        when:
          equals:
            document_type: applog_myappapi_stats
      - topic: 'myapp_elblog'
        when:
          equals:
            document_type: elblog_myappapi
    partition.round_robin:
      reachable_only: false
    required_acks: 1
    compression: gzip
    max_message_bytes: 1000000
############################# Logging #########################################
# There are three options for the log ouput: syslog, file, stderr.
# Under Windos systems, the log files are per default sent to the file output,
# under all other system per default to syslog.
logging:
  # Send all logging output to syslog. On Windows default is false, otherwise
  # default is true.
  to_syslog: true
  # Write all logging output to files. Beats automatically rotate files if rotateeverybytes
  # limit is reached.
  to_files: true
  # To enable logging to files, to_files option has to be set to true
  files:
    # The directory where the log files will written to.
    path: /var/log/
    # The name of the files where the logs are written to.
    name: filebeats.log
    # Configure log file size limit. If limit is reached, log file will be
    # automatically rotated
    rotateeverybytes: 10485760 # = 10MB
    # Number of rotated log files to keep. Oldest files will be deleted first.
    keepfiles: 7
  # Enable debug output for selected components. To enable all selectors use ["*"]
  # Other available selectors are beat, publish, service
  # Multiple selectors can be chained.
  #selectors: ["*" ]
  # Sets log level. The default log level is error.
  # Available log levels are: critical, error, warning, info, debug
  level: info
                I've got the same problem and deal with it by define output as:
topics:
  - topic: '%{[type]}'
use_type: true
and as input you only have to set in document_type: kaffka's topic
input_type: log paths:
input_type: log paths:
document_type: "you'r another kaffka's topic 1"
Input:
- type: log
  fields:
    kafka_topic: "my_topic_1"
- type: log
  fields:
    kafka_topic: "my_topic_2"
Output:
output.kafka:
  hosts: ["mybroker:9092"]
  topic: '%{[fields.kafka_topic]}'
Above example shows 2 log inputs and 2 kafka topic outputs
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With