Skip to content

Commit

Permalink
LOG-2770: optimize fluent conf for throughput
Browse files Browse the repository at this point in the history
  • Loading branch information
jcantrill committed Jul 22, 2022
1 parent e1053c3 commit 7c4de85
Show file tree
Hide file tree
Showing 22 changed files with 280 additions and 1,305 deletions.
2 changes: 1 addition & 1 deletion hack/testing-olm/test-040-eventrouter.sh
Expand Up @@ -152,7 +152,7 @@ espod=$(oc -n $LOGGING_NS get pods -l component=elasticsearch -o jsonpath={.item
os::log::info "Testing Elasticsearch pod ${espod}..."
os::cmd::try_until_text "oc -n $LOGGING_NS exec -c elasticsearch ${espod} -- es_util --query=/ --request HEAD --head --output /dev/null --write-out %{response_code}" "200" "$(( 1*$minute ))"

warn_nonformatted 'infra'
warn_nonformatted 'infra-*'

evpod=$(oc -n $LOGGING_NS get pods -l component=eventrouter -o jsonpath={.items[0].metadata.name})

Expand Down
4 changes: 0 additions & 4 deletions internal/generator/fluentd/conf.go
Expand Up @@ -31,10 +31,6 @@ func Conf(clspec *logging.ClusterLoggingSpec, secrets map[string]*corev1.Secret,
Sources(clspec, clfspec, op),
"Set of all input sources",
},
{
PrometheusMetrics(clfspec, op),
"Section to add measurement, and dispatch to Concat or Ingress pipelines",
},
{
Concat(clfspec, op),
`Concat pipeline section`,
Expand Down
173 changes: 23 additions & 150 deletions internal/generator/fluentd/conf_test.go
Expand Up @@ -116,7 +116,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type systemd
@id systemd-input
@label @MEASURE
@label @INGRESS
path '/var/log/journal'
<storage>
@type local
Expand All @@ -134,36 +134,28 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id container-input
path "/var/log/pods/**/*.log"
exclude_path ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log"]
path "/var/log/pods/*/*/*.log"
exclude_path ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.tmp"]
pos_file "/var/lib/fluentd/pos/es-containers.log.pos"
refresh_interval 5
rotate_wait 5
tag kubernetes.*
read_from_head "true"
skip_refresh_on_startup true
@label @MEASURE
@label @CONCAT
<parse>
@type multi_format
<pattern>
format json
time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
keep_time_key true
</pattern>
<pattern>
format regexp
expression /^(?<time>[^\s]+) (?<stream>stdout|stderr)( (?<logtag>.))? (?<log>.*)$/
time_format '%Y-%m-%dT%H:%M:%S.%N%:z'
keep_time_key true
</pattern>
@type regexp
expression /^(?<@timestamp>[^\s]+) (?<stream>stdout|stderr) (?<logtag>[F|P]) (?<message>.*)$/
time_key '@timestamp'
keep_time_key true
</parse>
</source>
# linux audit logs
<source>
@type tail
@id audit-input
@label @MEASURE
@label @INGRESS
path "/var/log/audit/audit.log"
pos_file "/var/lib/fluentd/pos/audit.log.pos"
tag linux-audit.log
Expand All @@ -176,7 +168,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id k8s-audit-input
@label @MEASURE
@label @INGRESS
path "/var/log/kube-apiserver/audit.log"
pos_file "/var/lib/fluentd/pos/kube-apiserver.audit.log.pos"
tag k8s-audit.log
Expand All @@ -193,7 +185,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id openshift-audit-input
@label @MEASURE
@label @INGRESS
path /var/log/oauth-apiserver/audit.log,/var/log/openshift-apiserver/audit.log
pos_file /var/lib/fluentd/pos/oauth-apiserver.audit.log
tag openshift-audit.log
Expand All @@ -210,7 +202,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id ovn-audit-input
@label @MEASURE
@label @INGRESS
path "/var/log/ovn/acl-audit-log.log"
pos_file "/var/lib/fluentd/pos/acl-audit-log.pos"
tag ovn-audit.log
Expand All @@ -222,72 +214,11 @@ var _ = Describe("Testing Complete Config Generation", func() {
</parse>
</source>
# Increment Prometheus metrics
<label @MEASURE>
<filter **>
@type record_transformer
enable_ruby
<record>
msg_size ${record.to_s.length}
</record>
</filter>
<filter **>
@type prometheus
<metric>
name cluster_logging_collector_input_record_total
type counter
desc The total number of incoming records
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</filter>
<filter **>
@type prometheus
<metric>
name cluster_logging_collector_input_record_bytes
type counter
desc The total bytes of incoming records
key msg_size
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</filter>
<filter **>
@type record_transformer
remove_keys msg_size
</filter>
# Journal Logs go to INGRESS pipeline
<match journal>
@type relabel
@label @INGRESS
</match>
# Audit Logs go to INGRESS pipeline
<match *audit.log>
@type relabel
@label @INGRESS
</match>
# Kubernetes Logs go to CONCAT pipeline
<match kubernetes.**>
@type relabel
@label @CONCAT
</match>
</label>
# Concat log lines of container logs, and send to INGRESS pipeline
<label @CONCAT>
<filter kubernetes.**>
@type concat
key log
key message
partial_key logtag
partial_value P
separator ''
Expand Down Expand Up @@ -391,47 +322,22 @@ var _ = Describe("Testing Complete Config Generation", func() {
</rule>
</match>
# Invoke kubernetes apiserver to get kunbernetes metadata
# Invoke kubernetes apiserver to get kubernetes metadata
<filter kubernetes.**>
@id kubernetes-metadata
@type kubernetes_metadata
kubernetes_url 'https://kubernetes.default.svc'
allow_orphans false
cache_size '1000'
use_journal 'nil'
ssl_partial_chain 'true'
</filter>
# Parse Json fields for container, journal and eventrouter logs
<filter kubernetes.journal.**>
@type parse_json_field
merge_json_log 'false'
preserve_json_log 'true'
json_fields 'log,MESSAGE'
</filter>
<filter kubernetes.var.log.pods.**>
@type parse_json_field
merge_json_log 'false'
preserve_json_log 'true'
json_fields 'log,MESSAGE'
</filter>
<filter kubernetes.var.log.pods.**_eventrouter-**>
@type parse_json_field
merge_json_log true
preserve_json_log true
json_fields 'log,MESSAGE'
</filter>
# Clean kibana log fields
<filter **kibana**>
@type record_transformer
enable_ruby
<record>
log ${record['err'] || record['msg'] || record['MESSAGE'] || record['log']}
</record>
remove_keys req,res,msg,name,level,v,pid,err
json_fields 'message'
</filter>
# Fix level field in audit logs
Expand All @@ -452,21 +358,12 @@ var _ = Describe("Testing Complete Config Generation", func() {
# Viaq Data Model
<filter **>
@type viaq_data_model
elasticsearch_index_prefix_field 'viaq_index_name'
enable_prune_empty_fields false
default_keep_fields CEE,time,@timestamp,aushape,ci_job,collectd,docker,fedora-ci,file,foreman,geoip,hostname,ipaddr4,ipaddr6,kubernetes,level,message,namespace_name,namespace_uuid,offset,openstack,ovirt,pid,pipeline_metadata,rsyslog,service,systemd,tags,testcase,tlog,viaq_msg_id
extra_keep_fields ''
keep_empty_fields 'message'
use_undefined false
undefined_name 'undefined'
rename_time true
rename_time_if_missing false
src_time_name 'time'
dest_time_name '@timestamp'
pipeline_type 'collector'
undefined_to_string 'false'
undefined_dot_replace_char 'UNUSED'
undefined_max_num_fields '-1'
process_kubernetes_events 'false'
process_kubernetes_events false
<level>
name warn
match 'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"'
Expand All @@ -487,50 +384,22 @@ var _ = Describe("Testing Complete Config Generation", func() {
name debug
match 'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"'
</level>
<formatter>
tag "system.var.log**"
type sys_var_log
remove_keys host,pid,ident
</formatter>
<formatter>
tag "journal.system**"
type sys_journal
remove_keys log,stream,MESSAGE,_SOURCE_REALTIME_TIMESTAMP,__REALTIME_TIMESTAMP,CONTAINER_ID,CONTAINER_ID_FULL,CONTAINER_NAME,PRIORITY,_BOOT_ID,_CAP_EFFECTIVE,_CMDLINE,_COMM,_EXE,_GID,_HOSTNAME,_MACHINE_ID,_PID,_SELINUX_CONTEXT,_SYSTEMD_CGROUP,_SYSTEMD_SLICE,_SYSTEMD_UNIT,_TRANSPORT,_UID,_AUDIT_LOGINUID,_AUDIT_SESSION,_SYSTEMD_OWNER_UID,_SYSTEMD_SESSION,_SYSTEMD_USER_UNIT,CODE_FILE,CODE_FUNCTION,CODE_LINE,ERRNO,MESSAGE_ID,RESULT,UNIT,_KERNEL_DEVICE,_KERNEL_SUBSYSTEM,_UDEV_SYSNAME,_UDEV_DEVNODE,_UDEV_DEVLINK,SYSLOG_FACILITY,SYSLOG_IDENTIFIER,SYSLOG_PID
</formatter>
<formatter>
tag "kubernetes.journal.container**"
type k8s_journal
remove_keys 'log,stream,MESSAGE,_SOURCE_REALTIME_TIMESTAMP,__REALTIME_TIMESTAMP,CONTAINER_ID,CONTAINER_ID_FULL,CONTAINER_NAME,PRIORITY,_BOOT_ID,_CAP_EFFECTIVE,_CMDLINE,_COMM,_EXE,_GID,_HOSTNAME,_MACHINE_ID,_PID,_SELINUX_CONTEXT,_SYSTEMD_CGROUP,_SYSTEMD_SLICE,_SYSTEMD_UNIT,_TRANSPORT,_UID,_AUDIT_LOGINUID,_AUDIT_SESSION,_SYSTEMD_OWNER_UID,_SYSTEMD_SESSION,_SYSTEMD_USER_UNIT,CODE_FILE,CODE_FUNCTION,CODE_LINE,ERRNO,MESSAGE_ID,RESULT,UNIT,_KERNEL_DEVICE,_KERNEL_SUBSYSTEM,_UDEV_SYSNAME,_UDEV_DEVNODE,_UDEV_DEVLINK,SYSLOG_FACILITY,SYSLOG_IDENTIFIER,SYSLOG_PID'
</formatter>
<formatter>
tag "kubernetes.var.log.pods.**_eventrouter-** k8s-audit.log** openshift-audit.log** ovn-audit.log**"
type k8s_json_file
remove_keys log,stream,CONTAINER_ID_FULL,CONTAINER_NAME
remove_keys stream
process_kubernetes_events 'true'
</formatter>
<formatter>
tag "kubernetes.var.log.pods**"
type k8s_json_file
remove_keys log,stream,CONTAINER_ID_FULL,CONTAINER_NAME
remove_keys stream
</formatter>
<elasticsearch_index_name>
enabled 'true'
tag "kubernetes.var.log.pods.openshift-*_** kubernetes.var.log.pods.default_** kubernetes.var.log.pods.kube-*_** journal.system** system.var.log**"
name_type static
static_index_name infra-write
</elasticsearch_index_name>
<elasticsearch_index_name>
enabled 'true'
tag "linux-audit.log** k8s-audit.log** openshift-audit.log** ovn-audit.log**"
name_type static
static_index_name audit-write
</elasticsearch_index_name>
<elasticsearch_index_name>
enabled 'true'
tag "**"
name_type static
static_index_name app-write
</elasticsearch_index_name>
</filter>
# Generate elasticsearch id
Expand Down Expand Up @@ -630,6 +499,10 @@ var _ = Describe("Testing Complete Config Generation", func() {
# Viaq Data Model
<filter **>
@type viaq_data_model
enable_openshift_model false
enable_prune_empty_fields false
rename_time false
undefined_dot_replace_char UNUSED
elasticsearch_index_prefix_field 'viaq_index_name'
<elasticsearch_index_name>
enabled 'true'
Expand Down

0 comments on commit 7c4de85

Please sign in to comment.