input { file { path => ["/var/log/suricata/eve.json"] sincedb_path => ["/var/lib/logstash/since.db"] codec => json type => "SuricataIDPS" } } filter { if [type] == "SuricataIDPS" { date { match => [ "timestamp", "ISO8601" ] } ruby { code => "if event['event_type'] == 'fileinfo'; event['fileinfo']['type']=event['fileinfo']['magic'].to_s.split(',')[0]; end;" } } if [src_ip] { geoip { source => "src_ip" target => "src_geoip" database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat" } mutate { add_field => { "src_ip_rdns" => "%{src_ip}" } } dns { reverse => [ "src_ip_rdns" ] action => "replace" } } if [dest_ip] { geoip { source => "dest_ip" target => "dest_geoip" database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat" } mutate { add_field => { "dest_ip_rdns" => "%{dest_ip}" } } dns { reverse => [ "dest_ip_rdns" ] action => "replace" } } #if [dest_port] and [proto] and ([proto] == "TCP" or [proto] == "UDP"){ # ruby { # code => "require 'socket'; event['dest_service'] = Socket::getservbyport(event['dest_port'], event['proto'].downcase)" # } #} } output { elasticsearch { hosts => ["localhost"] index => "logstash-suricata-%{+YYYY.MM.dd}" template => "/etc/logstash/templates/suricata_template.json" template_overwrite => true } }