跳至主要內容

记一个 Logstash 多源和多目标问题

Mr.Chen大约 1 分钟文章LogstashLinux

背景

最近新起了一套开发环境,需要从 Mysql 初始化数据到 ElasticSearch,以为重新新增一个 config 文件,配置新的 input 和 output 即可。

谁知经过测试发现,Logstash 启动之后,自动将第一个配置文件的 input 传输到两个 output 去了。

才知道,分开多个 config 文件,并没其他隔离作用

实现方式

后来在网上搜了下,发现有两种方法实现。

通过 filter 进行分开输出

#第一个输入
input {
  kafka {
    codec => json
    bootstrap_servers => "IP地址:端口号"
    group_id => "消费者组"
    client_id => "test"
    topics => ["test"]
    type => "apple"
    auto_offset_reset => "latest"
  }
}
#第二个输入
input {
  kafka {
    codec => json
    bootstrap_servers => "IP地址:端口号"
    group_id => "消费者组"
    client_id => "test2"
    topics => ["test2"]
    type => "banana"
    auto_offset_reset => "latest"
  }
}
#过滤
filter {
  if[type] == "apple" {
    mutate {
      add_tag => ["apple"]
    }
  }
    
  if [type] == "banana" {
    mutate {
      add_tag => ["banana"]
    }
  } 
}
#输出     
output {
  stdout { codec => json }
  
  if "apple" in [tags] {
    elasticsearch {
      index => "apple_index"
      hosts => ["IP地址:端口号"]
    } 
  }
    
  if "banana" in [tags] {
    elasticsearch {
      index => "banana_index"
      hosts => ["IP地址:端口号"]
    } 
  } 
}

通过 pipeline 进行分开输出

pipelines.yml文件配置

- pipeline.id: apple
  path.config: "/etc/logstash/conf.d/apple.conf"
- pipeline.id: banana
  path.config: "/etc/logstash/conf.d/banana.conf"

apple.conf

 input {
      kafka {
        codec => json
        bootstrap_servers => "IP地址:端口号"
        group_id => "消费者组"
        client_id => "test"
        topics => ["test"]
        auto_offset_reset => "latest"
      }
}
output {
    stdout { codec => json }
    
   elasticsearch {
        index => "apple_index"
        hosts => ["IP地址:端口号"]
   } 
}

banana.conf

 input {
      kafka {
        codec => json
        bootstrap_servers => "IP地址:端口号"
        group_id => "消费者组"
        client_id => "test"
        topics => ["test"]
        auto_offset_reset => "latest"
      }
}
output {
    stdout { codec => json }
    
   elasticsearch {
        index => "banana_index"
        hosts => ["IP地址:端口号"]
   } 
}