It wasn't simple to find out how to solve this but finally i get the solution :
filter {
grok {
match => {
"message" => [
'%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message <<(?<soap_in>.*?)>>',
'%{TIMESTAMP_ISO8601:log_timestamp} thread-%{INT:thread_id} SOAP message >>(?<soap_out>.*?)<<'
]
}
}
aggregate {
task_id => "%{thread_id}"
code => "
map['soap_in'] ||= []
map['soap_out'] ||= []
map['thread_id'] ||= []
map['thread_id'] = event.get('thread_id')
if event.get('soap_in')
map['soap_in'] << {'soap_in' => event.get('soap_in'), 'log_timestamp' => event.get('log_timestamp')}
end
if event.get('soap_out')
map['soap_out'] << {'soap_out' => event.get('soap_out'), 'log_timestamp' => event.get('log_timestamp')}
end
if map['soap_in'] && map['soap_out']
event.set('thread_id', map['thread_id'])
event.set('soap_in', map['soap_in'])
event.set('soap_out', map['soap_out'])
event.cancel()
end
"
push_previous_map_as_event => true
timeout => 3
}
mutate {
remove_field => ["message"]
}
}