Creating parsers
The guide assume you're writting the parser from the test environment
Base parser file
A simple parser can be defined as :
filter: 1 == 1
debug: true
onsuccess: next_stage
name: me/myparser
description: a cool parser for my service
grok:
#our grok pattern : capture .*
pattern: ^%{DATA:some_data}$
#the field to which we apply the grok pattern : the log message itself
apply_on: message
statics:
- parsed: is_my_service
value: yes
- a filter : if the expression is
true
, the event will enter the parser, otherwise, it won't - a onsuccess : defines what happens when the event was successfully parsed : shall we continue ? shall we move to next stage ? etc.
- a
name
& adescription
- some statics that will modify the event
- a
debug
flag that allows to enable local debugging information - a
grok
pattern to capture some data in logs
We are going to use to following sample log as an example (x.out
) :
May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0
May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0
Trying our mock parser
Your parser yaml file must be in the config/parsers/s01-parse/
directory. The stage directory might not exist, and then you must create it.
Let's try it:
./crowdsec -c ./dev.yaml -dsn file://x.log -type foobar
Expected output
INFO[20-08-2021 17:18:20] Crowdsec v1.1.1-linux-73e0bbaf93070f4a640eb5a22212b5dcf26699de
INFO[20-08-2021 17:18:21] reading x.log at once type="file://x.log"
DEBU[20-08-2021 17:18:21] + Grok '^%{DA...' returned 1 entries to merge in Parsed id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] .Parsed['some_data'] = 'May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0 ' id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] + Processing 1 statics id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] .Parsed[is_my_service] = 'yes' id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] Event leaving node : ok id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] move Event from stage s01-parse to s02-enrich id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] + Grok '^%{DA...' returned 1 entries to merge in Parsed id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] .Parsed['some_data'] = 'May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0' id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] + Processing 1 statics id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] .Parsed[is_my_service] = 'yes' id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] Event leaving node : ok id=billowing-flower name=me/myparser stage=s01-parse
DEBU[20-08-2021 17:18:21] move Event from stage s01-parse to s02-enrich id=billowing-flower name=me/myparser stage=s01-parse
...
We can see our "mock" parser is working, let's see what happened :
- The event enter the node
- The
filter
returned true (1 == 1
) so the event will be processed - Our grok pattern (just a
.*
capture) "worked" and captured data (the whole line actually) - The grok captures (under the name
some_data
) are merged into the.Parsed
map of the event - The
statics
section is processed, and.Parsed[is_my_service]
is set toyes
- The event leaves the parser successfully, and because
onsuccess
is set tonext_stage
, the event moves to the next stage
Writing the GROK pattern
We are going to write a parser for iptables
logs, they look like this :
May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0
May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0
Using an online grok debugger or an online regex debugger, we come up with the following grok pattern :
\[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*
Check if the pattern you are looking for is not already present in patterns configuration.
Test our new pattern
Now, let's integrate our GROK pattern within our YAML :
#let's set onsuccess to "next_stage" : if the log is parsed, we can consider it has been dealt with
onsuccess: next_stage
#debug, for reasons (don't do this in production)
debug: true
#as seen in our sample log, those logs are processed by the system and have a progname set to 'kernel'
filter: "1 == 1"
#name and description:
name: crowdsecurity/iptables-logs
description: "Parse iptables drop logs"
grok:
#our grok pattern
pattern: \[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*
#the field to which we apply the grok pattern : the log message itself
apply_on: message
statics:
- parsed: is_my_service
value: yes
./crowdsec -c ./dev.yaml -dsn file://x.log -type foobar
Expected output
INFO[20-08-2021 17:47:46] reading x.log at once type="file://x.log"
DEBU[20-08-2021 17:47:46] + Grok '[%{D...' returned 8 entries to merge in Parsed id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['proto'] = 'TCP' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['src_port'] = '45225' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['dst_port'] = '8888' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['action'] = '' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['int_eth'] = 'enp1s0' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['src_ip'] = '99.99.99.99' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['dst_ip'] = '127.0.0.1' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['length'] = '40' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] + Processing 1 statics id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed[is_my_service] = 'yes' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] Event leaving node : ok id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] move Event from stage s01-parse to s02-enrich id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
WARN[20-08-2021 17:47:46] Acquisition is finished, shutting down
DEBU[20-08-2021 17:47:46] + Grok '\[%{D...' returned 8 entries to merge in Parsed id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['length'] = '60' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['proto'] = 'TCP' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['src_port'] = '53668' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['dst_port'] = '80' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['action'] = '' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['int_eth'] = 'enp1s0' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['src_ip'] = '44.44.44.44' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed['dst_ip'] = '127.0.0.1' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] + Processing 1 statics id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] .Parsed[is_my_service] = 'yes' id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] Event leaving node : ok id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
DEBU[20-08-2021 17:47:46] move Event from stage s01-parse to s02-enrich id=summer-snowflake name=crowdsecurity/iptables-logs stage=s01-parse
...
What changed ? We can now see that the fragment captured by the GROK pattern are merged in the Parsed
array !
We now have parsed data, only a few more changes and we will be done :)
Finalizing our parser
#let's set onsuccess to "next_stage" : if the log is parsed, we can consider it has been dealt with
onsuccess: next_stage
#debug, for reasons (don't do this in production)
debug: true
#as seen in our sample log, those logs are processed by the system and have a progname set to 'kernel'
filter: "evt.Parsed.program == 'kernel'"
#name and description:
name: crowdsecurity/iptables-logs
description: "Parse iptables drop logs"
grok:
#our grok pattern
pattern: \[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*
#the field to which we apply the grok pattern : the log message itself
apply_on: message
statics:
- meta: log_type
value: iptables_drop
- meta: service
expression: "evt.Parsed.proto == 'TCP' ? 'tcp' : 'unknown'"
- meta: source_ip
expression: "evt.Parsed.src_ip"
filter
We changed the filter to correctly filter on the program name.
In the current example, our logs are produced by the kernel (netfilter), and thus the program is kernel
:
tail -f /var/log/kern.log
May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0