Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-5.5] LOG-2957: optimize fluent conf for throughput #1592

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 2 additions & 4 deletions hack/testing-olm/test-040-eventrouter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,7 @@ spec:
memory: 1Gi
cpu: 100m
collection:
logs:
type: "fluentd"
fluentd: {}
type: "fluentd"
EOL

deploy_eventrouter
Expand All @@ -152,7 +150,7 @@ espod=$(oc -n $LOGGING_NS get pods -l component=elasticsearch -o jsonpath={.item
os::log::info "Testing Elasticsearch pod ${espod}..."
os::cmd::try_until_text "oc -n $LOGGING_NS exec -c elasticsearch ${espod} -- es_util --query=/ --request HEAD --head --output /dev/null --write-out %{response_code}" "200" "$(( 1*$minute ))"

warn_nonformatted 'infra'
warn_nonformatted 'infra-*'

evpod=$(oc -n $LOGGING_NS get pods -l component=eventrouter -o jsonpath={.items[0].metadata.name})

Expand Down
1 change: 1 addition & 0 deletions internal/collector/fluentd/visitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func CollectorVisitor(collectorContainer *v1.Container, podSpec *v1.PodSpec) {
},
},
},
v1.EnvVar{Name: "RUBY_GC_HEAP_OLDOBJECT_LIMIT_FACTOR", Value: "0.9"},
)
collectorContainer.VolumeMounts = append(collectorContainer.VolumeMounts,
v1.VolumeMount{Name: certsVolumeName, ReadOnly: true, MountPath: certsVolumePath},
Expand Down
4 changes: 0 additions & 4 deletions internal/generator/fluentd/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ func Conf(clspec *logging.CollectionSpec, secrets map[string]*corev1.Secret, clf
Sources(clspec, clfspec, op),
"Set of all input sources",
},
{
PrometheusMetrics(clfspec, op),
"Section to add measurement, and dispatch to Concat or Ingress pipelines",
},
{
Concat(clfspec, op),
`Concat pipeline section`,
Expand Down
159 changes: 26 additions & 133 deletions internal/generator/fluentd/conf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type systemd
@id systemd-input
@label @MEASURE
@label @INGRESS
path '/var/log/journal'
<storage>
@type local
Expand All @@ -145,36 +145,28 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id container-input
path "/var/log/pods/**/*.log"
exclude_path ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log"]
path "/var/log/pods/*/*/*.log"
exclude_path ["/var/log/pods/openshift-logging_collector-*/*/*.log", "/var/log/pods/openshift-logging_elasticsearch-*/*/*.log", "/var/log/pods/openshift-logging_kibana-*/*/*.log", "/var/log/pods/*/*/*.gz", "/var/log/pods/*/*/*.tmp"]
pos_file "/var/lib/fluentd/pos/es-containers.log.pos"
refresh_interval 5
rotate_wait 5
tag kubernetes.*
read_from_head "true"
skip_refresh_on_startup true
@label @MEASURE
@label @CONCAT
<parse>
@type multi_format
<pattern>
format json
time_format '%Y-%m-%dT%H:%M:%S.%N%Z'
keep_time_key true
</pattern>
<pattern>
format regexp
expression /^(?<time>[^\s]+) (?<stream>stdout|stderr)( (?<logtag>.))? (?<log>.*)$/
time_format '%Y-%m-%dT%H:%M:%S.%N%:z'
keep_time_key true
</pattern>
@type regexp
expression /^(?<@timestamp>[^\s]+) (?<stream>stdout|stderr) (?<logtag>[F|P]) (?<message>.*)$/
time_key '@timestamp'
keep_time_key true
</parse>
</source>

# linux audit logs
<source>
@type tail
@id audit-input
@label @MEASURE
@label @INGRESS
path "/var/log/audit/audit.log"
pos_file "/var/lib/fluentd/pos/audit.log.pos"
tag linux-audit.log
Expand All @@ -187,7 +179,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id k8s-audit-input
@label @MEASURE
@label @INGRESS
path "/var/log/kube-apiserver/audit.log"
pos_file "/var/lib/fluentd/pos/kube-apiserver.audit.log.pos"
tag k8s-audit.log
Expand All @@ -204,7 +196,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id openshift-audit-input
@label @MEASURE
@label @INGRESS
path /var/log/oauth-apiserver/audit.log,/var/log/openshift-apiserver/audit.log
pos_file /var/lib/fluentd/pos/oauth-apiserver.audit.log
tag openshift-audit.log
Expand All @@ -221,7 +213,7 @@ var _ = Describe("Testing Complete Config Generation", func() {
<source>
@type tail
@id ovn-audit-input
@label @MEASURE
@label @INGRESS
path "/var/log/ovn/acl-audit-log.log"
pos_file "/var/lib/fluentd/pos/acl-audit-log.pos"
tag ovn-audit.log
Expand All @@ -233,72 +225,11 @@ var _ = Describe("Testing Complete Config Generation", func() {
</parse>
</source>

# Increment Prometheus metrics
<label @MEASURE>
<filter **>
@type record_transformer
enable_ruby
<record>
msg_size ${record.to_s.length}
</record>
</filter>

<filter **>
@type prometheus
<metric>
name cluster_logging_collector_input_record_total
type counter
desc The total number of incoming records
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</filter>

<filter **>
@type prometheus
<metric>
name cluster_logging_collector_input_record_bytes
type counter
desc The total bytes of incoming records
key msg_size
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</filter>

<filter **>
@type record_transformer
remove_keys msg_size
</filter>

# Journal Logs go to INGRESS pipeline
<match journal>
@type relabel
@label @INGRESS
</match>

# Audit Logs go to INGRESS pipeline
<match *audit.log>
@type relabel
@label @INGRESS
</match>

# Kubernetes Logs go to CONCAT pipeline
<match kubernetes.**>
@type relabel
@label @CONCAT
</match>
</label>

# Concat log lines of container logs, and send to INGRESS pipeline
<label @CONCAT>
<filter kubernetes.**>
@type concat
key log
key message
partial_key logtag
partial_value P
separator ''
Expand Down Expand Up @@ -402,49 +333,23 @@ var _ = Describe("Testing Complete Config Generation", func() {
</rule>
</match>

# Invoke kubernetes apiserver to get kunbernetes metadata
# Invoke kubernetes apiserver to get kubernetes metadata
<filter kubernetes.**>
@id kubernetes-metadata
@type kubernetes_metadata
kubernetes_url 'https://kubernetes.default.svc'
annotation_match ["^containerType\.logging\.openshift\.io\/.*$"]
allow_orphans false
cache_size '1000'
de_dot false
use_journal 'nil'
ssl_partial_chain 'true'
</filter>

# Parse Json fields for container, journal and eventrouter logs
<filter kubernetes.journal.**>
@type parse_json_field
merge_json_log 'false'
preserve_json_log 'true'
json_fields 'log,MESSAGE'
</filter>

<filter kubernetes.var.log.pods.**>
@type parse_json_field
merge_json_log 'false'
preserve_json_log 'true'
json_fields 'log,MESSAGE'
</filter>

<filter kubernetes.var.log.pods.**_eventrouter-**>
@type parse_json_field
merge_json_log true
preserve_json_log true
json_fields 'log,MESSAGE'
</filter>

# Clean kibana log fields
<filter **kibana**>
@type record_transformer
enable_ruby
<record>
log ${record['err'] || record['msg'] || record['MESSAGE'] || record['log']}
</record>
remove_keys req,res,msg,name,level,v,pid,err
json_fields 'message'
</filter>

# Fix level field in audit logs
Expand All @@ -466,21 +371,12 @@ var _ = Describe("Testing Complete Config Generation", func() {
<filter **>
@type viaq_data_model
enable_flatten_labels true
elasticsearch_index_prefix_field 'viaq_index_name'
enable_prune_empty_fields false
default_keep_fields CEE,time,@timestamp,aushape,ci_job,collectd,docker,fedora-ci,file,foreman,geoip,hostname,ipaddr4,ipaddr6,kubernetes,level,message,namespace_name,namespace_uuid,offset,openstack,ovirt,pid,pipeline_metadata,rsyslog,service,systemd,tags,testcase,tlog,viaq_msg_id
extra_keep_fields ''
keep_empty_fields 'message'
use_undefined false
undefined_name 'undefined'
rename_time true
rename_time_if_missing false
src_time_name 'time'
dest_time_name '@timestamp'
pipeline_type 'collector'
undefined_to_string 'false'
undefined_dot_replace_char 'UNUSED'
undefined_max_num_fields '-1'
process_kubernetes_events 'false'
process_kubernetes_events false
<level>
name warn
match 'Warning|WARN|^W[0-9]+|level=warn|Value:warn|"level":"warn"'
Expand All @@ -501,31 +397,21 @@ var _ = Describe("Testing Complete Config Generation", func() {
name debug
match 'Debug|DEBUG|^D[0-9]+|level=debug|Value:debug|"level":"debug"'
</level>
<formatter>
tag "system.var.log**"
type sys_var_log
remove_keys host,pid,ident
</formatter>
<formatter>
tag "journal.system**"
type sys_journal
remove_keys log,stream,MESSAGE,_SOURCE_REALTIME_TIMESTAMP,__REALTIME_TIMESTAMP,CONTAINER_ID,CONTAINER_ID_FULL,CONTAINER_NAME,PRIORITY,_BOOT_ID,_CAP_EFFECTIVE,_CMDLINE,_COMM,_EXE,_GID,_HOSTNAME,_MACHINE_ID,_PID,_SELINUX_CONTEXT,_SYSTEMD_CGROUP,_SYSTEMD_SLICE,_SYSTEMD_UNIT,_TRANSPORT,_UID,_AUDIT_LOGINUID,_AUDIT_SESSION,_SYSTEMD_OWNER_UID,_SYSTEMD_SESSION,_SYSTEMD_USER_UNIT,CODE_FILE,CODE_FUNCTION,CODE_LINE,ERRNO,MESSAGE_ID,RESULT,UNIT,_KERNEL_DEVICE,_KERNEL_SUBSYSTEM,_UDEV_SYSNAME,_UDEV_DEVNODE,_UDEV_DEVLINK,SYSLOG_FACILITY,SYSLOG_IDENTIFIER,SYSLOG_PID
</formatter>
<formatter>
tag "kubernetes.journal.container**"
type k8s_journal
remove_keys 'log,stream,MESSAGE,_SOURCE_REALTIME_TIMESTAMP,__REALTIME_TIMESTAMP,CONTAINER_ID,CONTAINER_ID_FULL,CONTAINER_NAME,PRIORITY,_BOOT_ID,_CAP_EFFECTIVE,_CMDLINE,_COMM,_EXE,_GID,_HOSTNAME,_MACHINE_ID,_PID,_SELINUX_CONTEXT,_SYSTEMD_CGROUP,_SYSTEMD_SLICE,_SYSTEMD_UNIT,_TRANSPORT,_UID,_AUDIT_LOGINUID,_AUDIT_SESSION,_SYSTEMD_OWNER_UID,_SYSTEMD_SESSION,_SYSTEMD_USER_UNIT,CODE_FILE,CODE_FUNCTION,CODE_LINE,ERRNO,MESSAGE_ID,RESULT,UNIT,_KERNEL_DEVICE,_KERNEL_SUBSYSTEM,_UDEV_SYSNAME,_UDEV_DEVNODE,_UDEV_DEVLINK,SYSLOG_FACILITY,SYSLOG_IDENTIFIER,SYSLOG_PID'
</formatter>
<formatter>
tag "kubernetes.var.log.pods.**_eventrouter-** k8s-audit.log** openshift-audit.log** ovn-audit.log**"
type k8s_json_file
remove_keys log,stream,CONTAINER_ID_FULL,CONTAINER_NAME
remove_keys stream
process_kubernetes_events 'true'
</formatter>
<formatter>
tag "kubernetes.var.log.pods**"
type k8s_json_file
remove_keys log,stream,CONTAINER_ID_FULL,CONTAINER_NAME
remove_keys stream
</formatter>
</filter>

Expand Down Expand Up @@ -626,6 +512,10 @@ var _ = Describe("Testing Complete Config Generation", func() {
# Viaq Data Model
<filter **>
@type viaq_data_model
enable_openshift_model false
enable_prune_empty_fields false
rename_time false
undefined_dot_replace_char UNUSED
elasticsearch_index_prefix_field 'viaq_index_name'
<elasticsearch_index_name>
enabled 'true'
Expand All @@ -649,6 +539,9 @@ var _ = Describe("Testing Complete Config Generation", func() {
<filter **>
@type viaq_data_model
enable_prune_labels true
enable_openshift_model false
rename_time false
undefined_dot_replace_char UNUSED
prune_labels_exclusions app.kubernetes.io/name,app.kubernetes.io/instance,app.kubernetes.io/version,app.kubernetes.io/component,app.kubernetes.io/part-of,app.kubernetes.io/managed-by,app.kubernetes.io/created-by
</filter>

Expand Down