Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Got "buffer flush took longer time than slow_flush_log_threshold" error #805

Open
1 task
chikinchoi opened this issue Sep 10, 2020 · 40 comments
Open
1 task

Comments

@chikinchoi
Copy link

(check apply)

Problem

Hi Team,

I got at lot of buffer flush took longer time than slow_flush_log_threshold: elapsed_time=60.16122445899964 slow_flush_log_threshold=20.0 plugin_id="firelens_es error in my fluentd which is connecting to Elasticsearch .
I saw that there is a document mentioned the reason for this error is because Elasticsearch is exhausted [1]. However, I checked all of the Elasticsearch clusters' CPU usage are low and healthy.

I have asked in Fluentd group and @repeatedly said that I should check the Fluentd CPU usage. Therefore, I found that the Fluentd service CPU usage is very high (almost 100%) when slow_flush error appears. I have increased the Fluentd CPU size and the error seems reduced but still can find it. I have tried to increase the Fluentd CPU size from 1024 to 2048. Although the number of slow_flush errors is decreased, I can still see it sometimes.
In conclusion, I think that the slow_flush error cause the Fluentd CPU usage increase. Therefore, I would like to know what is the cause of this error and how to fix it. Also, I don't understand why I should check the Fluentd CPU usage instead of the Elasticsearch CPU usage.
Please advise. Thank you very much!!

Steps to replicate

cannot reproduce because I don't know the reason for this error.
Below is my Fluentd config:

<system>
  @log_level debug
  workers 4
  root_dir /var/log/fluent/
</system>

<source>
  @type  forward
  @id    input1
  @log_level debug
  port  24224
  bind 0.0.0.0
</source>

# Used for docker health check
<source>
  @type http
  port 8888
  bind 0.0.0.0
</source>

# records sent for health checking won't be forwarded anywhere
<match health**>
  @type null
</match>

# Prometheus Configuration
# count number of incoming records per tag
<filter **>
  @type prometheus
  <metric>
    name fluentd_input_status_num_records_total
    type counter
    desc The total number of incoming records
    <labels>
      tag ${tag}
      hostname ${hostname}
    </labels>
  </metric>
</filter>

<filter **firelens**>
  @type concat
  key log
  multiline_start_regexp '^\{\\"@timestamp'
  multiline_end_regexp '/\}/'
  separator ""
  flush_interval 1
  timeout_label @NORMAL
</filter>

<filter **firelens**>
  @type parser
  key_name log
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

<filter **firelens**>
  @type record_transformer
  enable_ruby
  <record>
    taskDef ${record["ecs_task_definition"].gsub(/:.*/, '')}
  </record>
</filter>

<filter lambdaNode**>
  @type parser
  key_name data
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

<filter lambdaPython**>
  @type record_transformer
  enable_ruby
  <record>
    function_name ${record["function_name"].gsub(/:.*/, '')}
  </record>
</filter>

<filter lambdaPython**>
  @type parser
  key_name message
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

# count number of outgoing records per tag
<match **firelens**>
  @type copy
  @id firelens
  <store>
    @type elasticsearch
    @id firelens_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    ssl_verify false
    log_es_400_reason true
    index_name ${taskDef}-%Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    sniffer_class_name Fluent::Plugin::ElasticsearchSimpleSniffer
    request_timeout 2147483648
    <buffer tag, time, ecs_task_definition, taskDef>
      @type file
      flush_mode lazy
      flush_thread_count 8
      # path /var/log/fluent/firelens.*.buffer
      total_limit_size 8GB
      chunk_limit_size 80MB
      timekey 5s
      timekey_wait 1s
      overflow_action drop_oldest_chunk
      retry_max_interval 16s
      disable_chunk_backup true
      retry_forever false
    </buffer>
  </store>
  <store>
    @type prometheus
    @id firelens_pro
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of outgoing records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
</match>

<match lambdaNode**>
  @type copy
  @id lambdaNode
  <store>
    @type elasticsearch
    @id lambdaNode_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    include_timestamp true
    ssl_verify false
    log_es_400_reason true
    index_name ${$.context.functionName}-%Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    sniffer_class_name Fluent::Plugin::ElasticsearchSimpleSniffer
    reload_after 100
    <buffer tag, time, $.context.functionName>
      flush_mode lazy
      chunk_limit_size 5MB
      flush_thread_count 8
      total_limit_size 512MB
      timekey 5s
      timekey_wait 1s
      retry_max_interval 16s
      disable_chunk_backup true
    </buffer>
  </store>
  <store>
    @type prometheus
    @id lambdaNode_pro
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of outgoing records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
</match>

<match lambdaPython**>
  @type copy
  @id lambdaPython
  <store>
    @type elasticsearch
    @id lambdaPython_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    include_timestamp true
    ssl_verify false
    log_es_400_reason true
    index_name ${function_name}-%Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    sniffer_class_name Fluent::Plugin::ElasticsearchSimpleSniffer
    reload_after 100
    <buffer tag, time, function_name>
      flush_mode lazy
      chunk_limit_size 5MB
      flush_thread_count 8
      total_limit_size 512MB
      timekey 5s
      timekey_wait 1s
      retry_max_interval 16s
      disable_chunk_backup true
    </buffer>
  </store>
  <store>
    @type prometheus
    @id lambdaPython_pro
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of outgoing records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
</match>

<label @FLUENT_LOG>
  <match fluent.*>
    @type null
  </match>
</label>

<label @NORMAL>
  <match **>
    @type null
  </match>
</label>

# expose metrics in prometheus format
<source>
  @type prometheus
  bind 0.0.0.0
  port 24231
  metrics_path /metrics
</source>

<source>
  @type prometheus_output_monitor
  interval 10
  <labels>
    hostname ${hostname}
  </labels>
</source>

Expected Behavior or What you need to ask

No this error

Using Fluentd and ES plugin versions

fluentd running in AWS ECS Fargate service
fluentd version: v1.11.1
fluent-plugin-elasticsearch verrsion 4.1.0

@cosmo0920
Copy link
Collaborator

cosmo0920 commented Sep 10, 2020

Therefore, I would like to know what is the cause of this error and how to fix it.

This causes buffer operation should be jammed.

Also, I don't understand why I should check the Fluentd CPU usage instead of the Elasticsearch CPU usage.

This warnings are caused by high CPU usage on Fluentd and/or lack of ingestion capacity on Elasticsearch cluster.

When high I/O usage occurred in Fluentd side, this causes buffer operations operate frequently and rises a probability to be exceeded to buffer operation capacity. So, high flow rate ingestion should cause slow_flush_log_threshold error.

And also, lack of Elasticsearch ingestion capacity causes piling up unprocessed buffers. Thus, unprocessed piling up buffers causes this error.

@chikinchoi
Copy link
Author

chikinchoi commented Sep 10, 2020

Therefore, I would like to know what is the cause of this error and how to fix it.

This causes buffer operation should be jammed.

Also, I don't understand why I should check the Fluentd CPU usage instead of the Elasticsearch CPU usage.

This warnings are caused by high CPU usage on Fluentd and/or lack of ingestion capacity on Elasticsearch cluster.

When high I/O usage occurred in Fluentd side, this causes buffer operations operate frequently and rises a probability to be exceeded to buffer operation capacity. So, high flow rate ingestion should cause slow_flush_log_threshold error.

And also, lack of Elasticsearch ingestion capacity causes piling up unprocessed buffers. Thus, unprocessed piling up buffers causes this error.

Thank you for your prompt reply!
I understand that the frequently flush buffer causes this slow_flush_log_threashold error, but we cannot increase the flush_interval as we would like to emit the log to elasticsearch in 5 seconds.
As I am using multiple workers(now 4 workers), should I increase or decrese the number of workers to fix this error? If not, what can I do to fix it?
Thank you.

@chikinchoi
Copy link
Author

Hi @cosmo0920 ,

Hi,
I am still struggling with finding a solution to this issue.
I have increased the flush_thread_count from 8 to 32 but still got slow_flush error. What is the ideal value of this parameter?
Also, I found that I didn't add " queue_limit_length " parameter in the fluentd config. Is it the same parameter with " total_limit_size " according to [1]?
The Prometheus is added to monitor the fluentd performance. However, there is no detailed description for each metrics data. Which metrics data should I check when I got a slow_flush warning?
For example, which metrics should I check if I would like to know if all fluent thread is busy?
Please help.

[1] https://docs.fluentd.org/configuration/buffer-section#buffering-parameters

@cosmo0920
Copy link
Collaborator

cosmo0920 commented Sep 18, 2020

Sorry for being late response.

I understand that the frequently flush buffer causes this slow_flush_log_threashold error, but we cannot increase the flush_interval as we would like to emit the log to elasticsearch in 5 seconds.
As I am using multiple workers(now 4 workers), should I increase or decrese the number of workers to fix this error? If not, what can I do to fix it?

Elasticsearch bulk API capped 200 queue_size by default.
We don't recommend to be doing such frequently bulk API requests.
Frequent bulk API requests causes rejections to insert documents.
We can obtain Elasticsearch queue metrics with GET /_nodes/thread_pool.

ref: https://www.elastic.co/blog/why-am-i-seeing-bulk-rejections-in-my-elasticsearch-cluster

example)

Using cURL in my development environment:

$ curl localhost:9200/_nodes/thread_pool | jq . 
{
  "_nodes": {
    "total": 1,
    "successful": 1,
    "failed": 0
  },
  "cluster_name": "elasticsearch_cosmo",
  "nodes": {
    "Sf-WnAEOQN-AbE8lj9YirA": {
      "name": "Hiroshi-no-MacBook-Pro.local",
      "transport_address": "127.0.0.1:9300",
      "host": "127.0.0.1",
      "ip": "127.0.0.1",
      "version": "7.8.1",
      "build_flavor": "oss",
      "build_type": "tar",
      "build_hash": "b5ca9c58fb664ca8bf9e4057fc229b3396bf3a89",
      "roles": [
        "data",
        "ingest",
        "master",
        "remote_cluster_client"
      ],
      "thread_pool": {
        "force_merge": {
          "type": "fixed",
          "size": 1,
          "queue_size": -1
        },
        "fetch_shard_started": {
          "type": "scaling",
          "core": 1,
          "max": 8,
          "keep_alive": "5m",
          "queue_size": -1
        },
        "listener": {
          "type": "fixed",
          "size": 2,
          "queue_size": -1
        },
        "refresh": {
          "type": "scaling",
          "core": 1,
          "max": 2,
          "keep_alive": "5m",
          "queue_size": -1
        },
        "generic": {
          "type": "scaling",
          "core": 4,
          "max": 128,
          "keep_alive": "30s",
          "queue_size": -1
        },
        "warmer": {
          "type": "scaling",
          "core": 1,
          "max": 2,
          "keep_alive": "5m",
          "queue_size": -1
        },
        "search": {
          "type": "fixed_auto_queue_size",
          "size": 7,
          "queue_size": 1000
        },
        "flush": {
          "type": "scaling",
          "core": 1,
          "max": 2,
          "keep_alive": "5m",
          "queue_size": -1
        },
        "fetch_shard_store": {
          "type": "scaling",
          "core": 1,
          "max": 8,
          "keep_alive": "5m",
          "queue_size": -1
        },
        "management": {
          "type": "scaling",
          "core": 1,
          "max": 5,
          "keep_alive": "5m",
          "queue_size": -1
        },
        "get": {
          "type": "fixed",
          "size": 4,
          "queue_size": 1000
        },
        "analyze": {
          "type": "fixed",
          "size": 1,
          "queue_size": 16
        },
        "write": {
          "type": "fixed",
          "size": 4,
          "queue_size": 200
        },
        "snapshot": {
          "type": "scaling",
          "core": 1,
          "max": 2,
          "keep_alive": "5m",
          "queue_size": -1
        },
        "search_throttled": {
          "type": "fixed_auto_queue_size",
          "size": 1,
          "queue_size": 100
        }
      }
    }
  }
}

@chikinchoi
Copy link
Author

chikinchoi commented Sep 22, 2020

Hi @cosmo0920 ,

Thank you for your suggestion, I found that the thread_pool queue_size is 1000 now.

"thread_pool" : {
"watcher" : {
  "type" : "fixed",
  "size" : 40,
  "queue_size" : 1000
},

Also, according to @repeatedly 's comment in https://groups.google.com/g/fluentd/c/3xrieNheguE , I have optimized my fluentd config (increased flush_thread_count to 16, increased timekey to 9s, use record_modifier instead of record_transformer, removed tag chunk_key).

<system>
  @log_level debug
  workers 4
  root_dir /var/log/fluent/
</system>

<source>
  @type  forward
  @id    input1
  @log_level debug
  port  24224
  bind 0.0.0.0
</source>

# Used for docker health check
<source>
  @type http
  port 8888
  bind 0.0.0.0
</source>

# records sent for health checking won't be forwarded anywhere
<match health**>
  @type null
</match>

# Prometheus Configuration
# count number of incoming records per tag
<filter **>
  @type prometheus
  <metric>
    name fluentd_input_status_num_records_total
    type counter
    desc The total number of incoming records
    <labels>
      tag ${tag}
      hostname ${hostname}
    </labels>
  </metric>
</filter>

<filter **firelens**>
  @type concat
  key log
  multiline_start_regexp '^\{\\"@timestamp'
  multiline_end_regexp '/\}/'
  separator ""
  flush_interval 1
  timeout_label @NORMAL
</filter>

<filter **firelens**>
  @type parser
  key_name log
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

<filter **firelens**>
  @type record_modifier
  <record>
    taskDef ${record["ecs_task_definition"].gsub(/:.*/, '')}
  </record>
</filter>

<filter lambdaNode**>
  @type parser
  key_name data
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

<filter lambdaPython**>
  @type parser
  key_name message
  reserve_data true
  emit_invalid_record_to_error false
  <parse>
  @type json
  </parse>
</filter>

# count number of outgoing records per tag
<match **firelens**>
  @type copy
  @id firelens
  <store>
    @type elasticsearch
    @id firelens_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    ssl_verify false
    log_es_400_reason true
    index_name ${taskDef}-%Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    <buffer time, taskDef>
      @type file
      flush_mode lazy
      flush_thread_count 16
      total_limit_size 15GB
      chunk_limit_size 80MB
      timekey 9s
      timekey_wait 1s
      overflow_action drop_oldest_chunk
      retry_max_interval 16s
      disable_chunk_backup true
      retry_forever false
    </buffer>
  </store>
  <store>
    @type prometheus
    @id firelens_pro
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of outgoing records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
</match>

<match lambdaNode**>
  @type copy
  @id lambdaNode
  <store>
    @type elasticsearch
    @id lambdaNode_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    include_timestamp true
    ssl_verify false
    log_es_400_reason true
    index_name ${$.context.functionName}-%Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    <buffer time, $.context.functionName>
      flush_mode lazy
      chunk_limit_size 5MB
      flush_thread_count 16
      total_limit_size 1024MB
      timekey 9s
      timekey_wait 1s
      retry_max_interval 16s
      disable_chunk_backup true
    </buffer>
  </store>
  <store>
    @type prometheus
    @id lambdaNode_pro
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of outgoing records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
</match>

<match lambdaPython**>
  @type copy
  @id lambdaPython
  <store>
    @type elasticsearch
    @id lambdaPython_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    include_timestamp true
    ssl_verify false
    log_es_400_reason true
    index_name ${function_name}-%Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    <buffer time, function_name>
      flush_mode lazy
      chunk_limit_size 5MB
      flush_thread_count 16
      total_limit_size 1024MB
      timekey 9s
      timekey_wait 1s
      retry_max_interval 16s
      disable_chunk_backup true
    </buffer>
  </store>
  <store>
    @type prometheus
    @id lambdaPython_pro
    <metric>
      name fluentd_output_status_num_records_total
      type counter
      desc The total number of outgoing records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </store>
</match>

<label @FLUENT_LOG>
  <match fluent.*>
    @type null
  </match>
</label>

<label @NORMAL>
  <match **>
    @type null
  </match>
</label>

# expose metrics in prometheus format
<source>
  @type prometheus
  bind 0.0.0.0
  port 24231
  metrics_path /metrics
</source>

<source>
  @type prometheus_output_monitor
  interval 10
  <labels>
    hostname ${hostname}
  </labels>
</source>

In addition, In order to find the root cause of slow_flush warning, I used PromQL to make the Prometheus data become meaningful according to [1]. I can see that the number of incoming records is not very high (~2k/s). However, the "maximum buffer length in last 1min" always increases and reaches the max buffer length ( I set the flush_thread_count = 16). May I know is it normal behavior for the increasing buffer length? Could anyone help to check if any data in the below graphs are abnormal?

image
image
image

Also, I have consulted the Elasticsearch support, he checked the health status for the Elasticsearch cluster, and seems everything is fine.
image

[1] https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus

Please help. Thank you very much!

@cosmo0920
Copy link
Collaborator

cosmo0920 commented Sep 23, 2020

I not sure really how to fix.
But retry count seems not to be increased.
Sending logs into Elasticsearch are working nice except for complaining slow flush log threshold.

fluentd_output_status_buffer_available_space_ratio says that Fluentd workers have very enough space for buffer (around 99%).
I think that maximum buffer bytes does not hit the limit of buffer (total_limit_size).
So, I guess that this slow flush log threshold warnings are caused by frequently flushing which causes buffer fragmentation and miniaturization of its size.

@chikinchoi
Copy link
Author

Hi @cosmo0920 ,
I have turned on the parameter "with_transporter_log" and found a log before the slow_flush warning.

2020-09-28 02:44:56 +0000 [info]: #3 [firelens_es] POST https://fluentd-user:**********@corxxxxxxxx-dev.xxx.org.hk:9200/_bulk [status:200, request:939.370s, query:0.003s]
2020-09-28 02:44:56 +0000 [warn]: #3 [firelens_es] buffer flush took longer time than slow_flush_log_threshold: elapsed_time=939.3784740579977 slow_flush_log_threshold=20.0 plugin_id="firelens_es"

It seems the request time for _bulk request caused the slow_flush_log warning. Is this log meaningful to you?
Thank you!

@sumit2904
Copy link

sumit2904 commented Nov 18, 2020

I am facing the same issue

buffer flush took longer time than slow_flush_log_threshold: elapsed_time=21.436356759862974 slow_flush_log_threshold=20.0 plugin_id="object:3fe37b8f2a48"

This is my config right now.

<system>
  workers 2
</system>
<source>
  @type forward
  port 24224
  bind 0.0.0.0
</source>
<match *.**>
  @type copy
  <store>
    @type elasticsearch
    hosts host1,host2,host3
    port 9200
    user username
    password ******
    logstash_format true
    logstash_prefix fluentd
    logstash_dateformat %Y%m%d
    include_tag_key true
    type_name access_log
    tag_key @log_name
    include_timestamp true
    #index_name fluentd
  </store>
  <store>
    @type stdout
  </store>
  <buffer tag>
    @type memory # or file
    flush_interval 1
    chunk_limit_size 1m
    flush_thread_interval 0.1
    flush_thread_burst_interval 0.01
    flush_thread_count 15
    total_limit_size 2GB
  </buffer>
</match>

@chikinchoi I am facing the same issue as yours, I would love to connect with you and try to understand more and fix this issue.

@yashumitsu
Copy link

Hi, please try to reduce bulk_message_request_threshold size (from default 20 MB to 1-2MB). It seems to make delivery more stable.
We experienced the same problem with a high message rate. Elasticsearch flush time grew constantly, and fluentd_output_status_emit_records metric was very spiky.

@g3kr
Copy link

g3kr commented Dec 14, 2020

@cosmo0920 @chikinchoi how were you able to resolve this? Can you share your resolution?

@adrdimitrov
Copy link

adrdimitrov commented Jul 28, 2021

@chikinchoi @cosmo0920 Hey guys,

I am facing almost exactly the same issue and can't fix it. I see that this thread is closed for almost half an year, were you able to fix the issue and if yes how do you manage to do that.

I have Fluentd on Kubernetes with 90% of the logs generated by one of the nodes, i tried a lot of configuration with small or big flush_thread_count, different sizes of the buffer and so on. I couldn't find the exact configuration to work with. Elasticsearch seems totally fine, but i constantly get slow flush error as mentioned here, and my buffer gets overflown constantly (the strange thing is that it is not overflown in moments with high logs load). Any ideas or help will be much appreciated.

Current aggregator config:

<match **>
  @type copy
  <store>
    @type elasticsearch
    hosts hostname
    request_timeout 30s
    resurrect_after 5
    # avoid https://discuss.elastic.co/t/elasitcsearch-ruby-raises-cannot-get
    # -new-connection-from-pool-error/36252/6
    reload_connections false
    reconnect_on_error true
    reload_on_failure true
    logstash_format true
    logstash_prefix logs-eks-s-test-1
    logstash_dateformat %Y.%m.%d
    # @timestamp: use event time, not time of indexing
    time_key time
    include_tag_key true
    include_timestamp true
    <buffer>
      @type file
      path /opt/bitnami/fluentd/logs/buffers
      flush_interval 1s
      flush_thread_count 20
      chunk_limit_size 16m
      total_limit_size 2048m
      queued_chunks_limit_size 4096
      overflow_action drop_oldest_chunk
      retry_forever true
    </buffer>
  </store>

@yashumitsu
Copy link

@adrdimitrov What version do you use?
I think it was fixed by now. We use td-agent-4.1 (which contains fluent-plugin-elasticsearch-4.3.3) without problem.

@adrdimitrov
Copy link

seems to be related with: #885, i have mentioned my versions there.

@chikinchoi
Copy link
Author

@yashumitsu, I have updated the fluentd-elasticsearch plugin version from 4.2.2 to 4.3.3 but still can see the warning "buffer flush took longer time than slow_flush_log_threshold: elapsed_time=939.1508707999999 slow_flush_log_threshold=20.0 plugin_id="firelens_es".
May I know more about what different made for this issue in 4.3.3?

@g3kr
Copy link

g3kr commented Aug 16, 2021

@yashumitsu I am using fluentd version 1.11.4 and fluentd-elasticsearch plugin version 5.0.0 and still experiencing this issue.

@yashumitsu
Copy link

@chikinchoi, @g3kr

I mentioned it here:

I was thinking, the bulk_message_request_threshold default value was changed in 4.3.1:

But, unfortunately (maybe I'm missing something), it seems to remain the same:

Can you try with explicit: bulk_message_request_threshold -1?

This is our conf:

<system>
  workers <num_of_cpus>
</system>

<match common>
  @type elasticsearch
  reconnect_on_error true
  request_timeout 2147483648
  include_timestamp true
  http_backend typhoeus
  slow_flush_log_threshold 10s
  with_transporter_log true
  logstash_format true
  host elastic.example.com
  custom_headers {"Connection": "Keep-Alive"}
  log_es_400_reason true
  bulk_message_request_threshold -1
  <buffer>
    @type file
    retry_randomize false
    queue_limit_length 4
    flush_thread_count <num_of_cpus>
    retry_wait 5s
    chunk_limit_size 64M
    flush_interval 1s
    path /var/log/td-agent/buffer/common.elasticsearch.buffer
    retry_forever true
    overflow_action block
    retry_type periodic
  </buffer>
</match>

@g3kr
Copy link

g3kr commented Aug 18, 2021

@yashumitsu Thank you for the response. You suggest bulk_message_request_threshold to be set to -1? Can you please explain what this does?

@chikinchoi
Copy link
Author

chikinchoi commented Aug 19, 2021

@g3kr , I have upgraded the fluent-elasticsearch-plugin to version 5.0.5 and added bulk_message_request_threshold -1 to the config. but still can see the slow_flush warning message. below is the match section config, anything wrong?

<match **firelens**>
  @type copy
  @id firelens
  <store>
    @type elasticsearch
    @id firelens_es
    scheme https
    ssl_version TLSv1_2
    host  "#{ENV['ELASTIC_HOST']}"
    port  "#{ENV['ELASTIC_POST']}"
    user  "#{ENV['ELASTIC_USER']}"
    password "#{ENV['ELASTIC_PWD']}"
    ssl_verify false
    log_es_400_reason true
    logstash_format true
    logstash_prefix ${taskDef}
    logstash_dateformat %Y.%m
    reconnect_on_error true
    reload_on_failure true
    reload_connections false
    suppress_type_name true
    request_timeout 2147483648
    http_backend typhoeus
    sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
    bulk_message_request_threshold -1
    <buffer taskDef>
      @type file
      flush_mode interval
      flush_interval 5s
      flush_thread_count 16
      total_limit_size 8GB
      chunk_limit_size 80MB
      overflow_action drop_oldest_chunk
      retry_max_interval 16s
      disable_chunk_backup true
      retry_forever false
      chunk_limit_records 5000
    </buffer>
    <metadata>
     include_chunk_id true
    </metadata>
  </store>
  <store>
    @type prometheus
    @id firelens_pro
    <metric>
      name fluentd_output_status_num_records_total_firelens
      type counter
      desc The total number of outgoing records firelens
      <labels>
        taskDef ${taskDef}
      </labels>
    </metric>
  </store>
</match>

@g3kr
Copy link

g3kr commented Aug 19, 2021

@chikinchoi Its difficult for me to reproduce the issue. Hence I am not able to test the new parameter. Under what circumstances do you see this error/warn?

@renegaderyu
Copy link

renegaderyu commented Aug 19, 2021

@chikinchoi

I'm using 5.0.5 as well. Our team encountered slow buffer flush logs quite a few times and the fix wasn't always the same but was usually related to tuning buffer parameters and number of tasks/workers accepting data. In general, turning on debug logging and using the monitor_agent for fluent helped us hunt down the issues and see if changes worked as expected.

Here are some of the things that I would try

  1. set @log_level debug on the elasticsearch output, or globally, to get details about creation of buffer chunks and requests to ES
  2. Use the monitor_agent fluent source to get metrics related to available buffer space. Even though you may have configured a. large total_limit_size and there is plenty available, you may notice issues when you actually start using a significant portion of your buffers. We noticed issues when available space dipped below some arbitrary percentage of available space, say 70%, and this was indicative of issues elsewhere.
  3. Try swapping file buffers for memory buffers. We saw a significant performance improvement in doing this and our buffers are almost always empty (<1%) whereas with file based buffer there was always some small percentage (1-7%) that was in use. The loss of chunk durability has so far proved to be worth it.
  4. Tune your chunk_limit_records and chunk_limit_size. For example, an avg. record size of 1500 bytes will result in chunks that are ~7.5MB (1500 * 5000), this is no where near the 80MB limit. Unless there is a specific reason for using chunk_limit_records, try dropping that and just letting chunk_limit_size dictate the request size to elasticsearch.
  5. Try reducing request_timeout 2147483648 and using retries w/ exponential backoff.
  6. Set slow_flush_log_threshold to a larger value. If everything is performing fine, get your average buffer flush time and use that as your threshold.
  7. Try setting 1 worker per 1024 CPU units. I noticed you mention increasing "Fluentd CPU size from 1024 to 2048" but the first config you posted had 4 workers.
  8. Start thinking about using a stream/queueing mechanism (Kafka, Kinesis, etc) that fluent can offload to quickly

@g3kr
Copy link

g3kr commented Aug 23, 2021

@renegaderyu

Thanks for your suggestions here. I have a couple of follow up questions -

If we reduce the chunk_limit_size to 10 MB, how does the flush to ES happen? I am assuming there will be too many chunks created that needs to be flushed to ES

How do we determine the optimal value for slow_flush_log_threshold?

Below is my buffer configuration

<store>
        @type elasticsearch
        host "#{ENV['ES_HOSTNAME']}"
        port 9243
        user "#{ENV['ES_USERNAME']}"
        password "#{ENV['ES_PASSWORD']}"
        # prevent duplicate log entries and updates
        id_key _hash
        #remove_keys _hash
        scheme https
        with_transporter_log true
        @log_level debug
        ssl_verify false
        ssl_version TLSv1_2
        index_name ${indexprefix}
        reconnect_on_error true
        reload_connections false
        reload_on_failure true
        suppress_type_name true
        request_timeout 30s
        prefer_oj_serializer true
        type_name _doc
        bulk_message_request_threshold -1
        #write_operation create
        <buffer indexprefix>
          @type "file"
          path "#{ENV['BufferPath']}"
          flush_thread_count 4
          #chunk+enqueue
          flush_mode interval
          flush_interval 30s
          chunk_limit_size 64MB
          total_limit_size 64GB
          flush_at_shutdown true
          overflow_action drop_oldest_chunk
          compress gzip
          #retry properties
          retry_forever false                        # Avoid retry indefinitely.
          retry_wait 20s                             # The wait interval for the first retry.
          retry_type exponential_backoff             # Set 'periodic' for constant intervals.
          retry_exponential_backoff_base 2           # Increases the wait time by a factor of 2.
          retry_max_interval 60s                     # Wait intervals are capped to 60s
          retry_max_times 3                    # Maximum retry count before giving up
        </buffer>
        </store>

what issues do you see with this?

@renegaderyu
Copy link

@g3kr

If we reduce the chunk_limit_size to 10 MB, how does the flush to ES happen? I am assuming there will be too many chunks created that needs to be flushed to ES

As I understand it, the flush to ES still happens on an interval (based on your config) but it tries to to send a bulk request to ES for each chunk. So, yes, it is likely that if you have too much data incoming, or an ES cluster that cannot support that many bulk requests you will start seeing errors/retries. If you don't have enough flush threads to actually output data fast enough, you might just see the buffers continually grow. Some "napkin math" should help you get the right configurations to make sure fluent can flush as fast as its coming in (factor in that buffers and flush threads are per worker). This, or debug metrics, should also help you determine max buffer size used for your configured flush interval.

How do we determine the optimal value for slow_flush_log_threshold?

According to the docs this is just The threshold for chunk flush performance check. I think changing this pretty much just changes at what point you consider the buffer flush to be slow or not. I'd try to set it to a p95/p99 value from a sample of debug logs taken when I know everything is performing well.

Below is my buffer configuration
...
what issues do you see with this?

Keep in mind, I don't know your ingest data rate or ES cluster size/details but the things that jump out to me are the @type "file", flush_interval, chunk_limit_size, flush_thread_count , and total_limit_size. Assuming you're not seeing lot of errors/retires to ES right now, I'd start by decreasing the flush_interval and chunk_limit_size and increasing flush_thread_count to support that. I only mention total_limit_size because it is great to be able to buffer a lot in the event of an ES issue but its best to make sure you actually need that much buffer space. I'm biased against file buffers so I'd recommend trying memory buffers. I see you've got with_transporter_log enabled as well, this should provide all the requests that fluent is sending to ES along with details to help determine if changes are having a net positive/negative effect. This will also help you better set config values for things like request_timeout.

QQ: I see you're using compress gzip in the ES buffer. Does this plugin actually support compression? If not, you might want to try disabling that and trade performance for disk space. If so, I need to go update my configs to use compression in our buffers since I figured fluent had to decompress before passing it to this plugin. https://docs.fluentd.org/configuration/buffer-section

@g3kr
Copy link

g3kr commented Aug 24, 2021

@renegaderyu Thank you so much for the detailed response. This is very much helpful. I will look into how to tweak these parameters.

To your question on compression. We gzip them to save some space in the buffers but you are right when it gets written to ES it is decompressed.

Another thing I am trying is to include the monitor_agent input plugin to export fluentd metrics via REST API.

<source>
  @type monitor_agent
  bind 127.0.0.1
  port 24220
</source>

We run our service in AWS Fargate backed by NLB, I was hoping to reach to the load balancer endpoint at port 24220 to get some metrics, that doesn't seem to work? you have some idea on what I might be missing?

Again appreciate all your help in responding to the questions

@renegaderyu
Copy link

We run our service in AWS Fargate backed by NLB, I was hoping to reach to the load balancer endpoint at port 24220 to get some metrics, that doesn't seem to work? you have some idea on what I might be missing?

Sorry I can't really say what you're missing. I can say that we run something very similar. If you're using multiple tasks, or autoscaling, you won't be able to associate the data from monitor_agent to the task behind the load balancer. We just opted to use emit_interval in the monitor_agent and then label, transform, and route those logs. Its less exposed from a security standpoint and easy to turn up/down by changing LOG_LEVEL and triggering a deployment.

Something like this:

  <source>
    @type monitor_agent
    @log_level "#{ENV['LOG_LEVEL']}"
    @label @INTERNALMETRICS
    tag "monitor.#{ENV['SOME_USEFUL_ENV_VALUE']}"
    emit_interval 60
  </source>
  <label @INTERNALMETRICS>
    <filter monitor.**>
      @type record_transformer
      <record>
        some_meaningful_field "#{ENV['SOME_OTHER_USEFUL_ENV_VALUE']}"
      </record>
    </filter>
    <match monitor.**>
      @type stdout
    </match>
  </label>

@g3kr
Copy link

g3kr commented Aug 24, 2021

@renegaderyu I will try this configuration and route the logs to stdout. Thanks again.

Its less exposed from a security standpoint and easy to turn up/down by changing LOG_LEVEL and triggering a deployment.

I quite did not understand this. How do you turn off this with the LOG_LEVEL. If I may also ask, what is the purpose of adding a record transformer here?

@renegaderyu
Copy link

@g3kr LOG_LEVEL is sourced from an environment var. If you're using fargate/ecs, environment vars are defined in the task definition for the service and changing them will not affect running fluentd processes. You have to kill the task and let a new one spin up, trigger a new deployment for the service, or something else to make fluentd pick up the new value for the env var.

The record transform is just to add extra fields/data to the metrics. For instance, you can add container/config version to the monitor_agent events so its easy to compare differences if you were to use a rolling or canary type of deployment. I just provided that as an example, it can be removed if you don't need it.

@g3kr
Copy link

g3kr commented Aug 24, 2021

@renegaderyu Makes sense. Thank you so much!

@g3kr
Copy link

g3kr commented Aug 24, 2021

@renegaderyu After implementing the monitor_agent input plugin, this is the sample event I see for my elasticsearch output plugin.

{
"plugin_id": "es_output",
"plugin_category": "output",
"type": "elasticsearch",
"output_plugin": true,
"buffer_queue_length": 0,
"buffer_timekeys": [],
"buffer_total_queued_size": 15228,
"retry_count": 0,
"emit_records": 24392,
"emit_count": 4996,
"write_count": 243,
"rollback_count": 0,
"slow_flush_count": 0,
"flush_time_count": 137999,
"buffer_stage_length": 2,
"buffer_stage_byte_size": 15228,
"buffer_queue_byte_size": 0,
"buffer_available_buffer_space_ratios": 100,
"TaskID": "a22e8d86b88a431e989ghfb7e2fbb26c"
}

I wasn't sure if this looked right from metric perspective. emit_records , emit_count, write_count have different values. Is that expected? Do they have to match?

I couldn't find documentation on how to interpret these numbers. Can you please enlighten me on this? Thanks

@cosmo0920
Copy link
Collaborator

cosmo0920 commented Aug 25, 2021

HI,

I wasn't sure if this looked right from metric perspective. emit_records , emit_count, write_count have different values. Is that expected? Do they have to match?

They don't have to match each other.

emit_records represents the number of emitted records.
emit_count represents the number of called emit_sync or emit_buffered method which is in base class of output plugin.
write_count represents the number of flushing times. This metric represents actual sending logs times for Elasticsearch in your circumstance.

<[[record1], [record2], ..., [recordN]]> ---> each of records are counted by emit_records and The group of chunks are counted by emit_count.

Write operation sometimes batched and write_count represents the number of this batched chunks.

[
  [[record11], [record12], ..., [record1M]],
  [[record21], [record22], ..., [record2M]],
  ...,
  [[recordN1], [recordN2], ..., [recordNM]],
]

@g3kr
Copy link

g3kr commented Aug 25, 2021

@cosmo0920 Thanks for clarifying this. Much appreciated.

Based on your definitions for these metrics, I am wondering how would one make use of these metrics in times of anomalies.

As per my understanding, unless your buffer_queue_length becomes > 0 or your retries increase, there is no way you can tell things are failing.

Please let me know if I am wrong

@cosmo0920
Copy link
Collaborator

Based on your definitions for these metrics, I am wondering how would one make use of these metrics in times of anomalies.

As per my understanding, unless your buffer_queue_length becomes > 0 or your retries increase, there is no way you can tell things are failing.

Yes, your understanding is almost correct.
For detecting anomalies, you also might have to check slow_flush_count for slowdown emission detection.

@g3kr
Copy link

g3kr commented Sep 2, 2021

@renegaderyu @cosmo0920

For the below config

<buffer indexprefix>
          @type "file"
          path "#{ENV['BufferPath']}"
          flush_thread_count 2
          #chunk+enqueue
          flush_mode interval
          flush_interval 30s
          chunk_limit_size 16MB
          total_limit_size 8GB
          flush_at_shutdown true
          overflow_action drop_oldest_chunk
          compress gzip
          #retry properties
          retry_forever false                        # Avoid retry indefinitely.
          retry_wait 20s                             # The wait interval for the first retry.
          retry_type exponential_backoff             # Set 'periodic' for constant intervals.
          retry_exponential_backoff_base 2           # Increases the wait time by a factor of 2.
          retry_max_interval 60s                     # Wait intervals are capped to 60s
          retry_max_times 1                   # Maximum retry count before giving up
        </buffer>

We observed something strange though I have retry_max_times set to 1, i saw this in our logs which cause Faraday:Timeouterror

2021-09-02 20:03:51 +0000 [warn]: #0 [es_output] failed to flush the buffer. retry_time=0 next_retry_seconds=2021-09-02 20:04:11.661884333 +0000 chunk=“5cb089f6964324f13ccc8d321dfb6c83” error_class=Fluent::Plugin::ElasticsearchOutput::RecoverableRequestFailure error=“could not push logs to Elasticsearch cluster ({:host=>\“es.us-east-1.aws.found.io\“, :port=>9243, :scheme=>\“https\“, :user=>\“g3kr\“, :password=>\“obfuscated\“}): Connection reset by peer (Errno::ECONNRESET)”

Any idea/thought on what might be going on here? When I just filtered for chunck id "5cb089f6964324f13ccc8d321dfb6c83" these are the number of retries while I would have expected it to retry only once and drop the chunk. Attached pic.

Screen Shot 2021-09-02 at 4 04 54 PM

@renegaderyu
Copy link

@g3kr Never seen that but it appears that taking back chunk for errors only occurs in the fluentd codebase of the try_flush method
https://github.com/fluent/fluentd/blob/master/lib/fluent/plugin/output.rb#L1141-L1238 (see line 1223), I can't grok Ruby that well but it appears to be a UNRECOVERABLE_ERRORS type. Maybe something where its try to flush your buffers can't even connect properly to try the request so it reclaims the chunk. You only shared buffer setting but this seems to be more likely a result of your output configuration.

In the troubleshooting doc for this plugin, https://github.com/uken/fluent-plugin-elasticsearch/blob/master/README.Troubleshooting.md, mentions turning on transporter logs, to get more info, and potentially setting the ssl_version.

@g3kr
Copy link

g3kr commented Sep 3, 2021

@renegaderyu Thanks for responding. This is my entire output config.

<match kubeMsgs.** >
        @id es_output
        @type elasticsearch
        host "#{ENV['ES_HOSTNAME']}"
        port 9243
        user "#{ENV['ES_USERNAME']}"
        password "#{ENV['ES_PASSWORD']}"
        # prevent duplicate log entries and updates
        id_key _hash
        #remove_keys _hash
        scheme https
        with_transporter_log true
        @log_level debug
        ssl_verify false
        ssl_version TLSv1_2
        index_name ${indexprefix}
        reconnect_on_error true
        reload_connections false
        reload_on_failure true
        suppress_type_name true
        request_timeout 60s
        prefer_oj_serializer true
        type_name _doc
        bulk_message_request_threshold -1
        slow_flush_log_threshold 30.0
        #write_operation create
        <buffer indexprefix>
          @type "file"
          path "#{ENV['BufferPath']}"
          flush_thread_count 4
          #chunk+enqueue
          flush_mode interval
          flush_interval 15s
          chunk_limit_size 8MB
          total_limit_size 8GB
          flush_at_shutdown true
          overflow_action drop_oldest_chunk
          #retry properties
          retry_wait 10
          retry_timeout 2m
          retry_type exponential_backoff          
          retry_max_interval 60
          retry_max_times 1    
          disable_chunk_backup true
        </buffer>
    </match>

@g3kr
Copy link

g3kr commented Sep 4, 2021

@renegaderyu It would be helpful if you could answer one more question. Is there a way to look at the contents of the buffer log file to view the chunks it will process. I tried to open and its encrypted. Please let me know. Thank you

@renegaderyu
Copy link

@g3kr I just meant, from the info provided, that I'm not convinced the taking back chunk for errors problem is a result of the buffer config. It could be output, it could be the destination, a proxy, or network in between. Since this issue is originally for slow buffer flushes, It might be worth opening a new issue and detailing all your configs and including relevant logs.

@renegaderyu It would be helpful if you could answer one more question. Is there a way to look at the contents of the buffer log file to view the chunks it will process. I tried to open and its encrypted. Please let me know. Thank you

I've peeked at some buffer chunks before and IIRC they are not encrypted but they do seem to use characters/bytes that do not print well as field/log separators. You could probably look at the fluentd source to figure out the format and write a quick script to help parse them. With that said, I suspect the usual fluentd debug logs, transporter logs, and emissions from monitor_agent should be sufficient to help you track down any problems.

@chikinchoi
Copy link
Author

chikinchoi commented Oct 26, 2021

@cosmo0920 @renegaderyu
Thanks for your suggestion but I have some question about the chunk_limit_size parameter.
I have set chunk_limit_size to 80MB as I want fluentd always using the chunk_limit_records to 5000 or the flush_interval 5s.
For the chunk_limit_size parameter, if I set the chunk_limit_size to 8MB, what will happen when single record is large than 8MB? and what will happen if there are 7MB in chunk now and an incoming record is 2MB?
Also, I found that sometimes the buffer available space ratio is decreasing but the incoming records is not very high. Do you think it is caused by chunk_limit_size 80MB? Is it possible that because there are a lot of large size records to the chunk and not enough 5000 records within 5s?

image
image
image

@renegaderyu
Copy link

I have set chunk_limit_size to 80MB as I want fluentd always using the chunk_limit_records to 5000 or the flush_interval 5s.

Is there a particular reason for this? I came across this blog post when I was tuning and it was tremendously helpful. My experience, has been pretty much the same. It seems the best throughput is achieved by having chunks appropriately, and consistently, sized and flushed. This is why I previously mentioned removing chunk_limit_records in a prior response. Limiting by records in a chunk will produce varying chunk sizes if your records are not all very consistent in size.

For the chunk_limit_size parameter, if I set the chunk_limit_size to 8MB, what will happen when single record is large than 8MB? and what will happen if there are 7MB in chunk now and an incoming record is 2MB?

I think a single record being larger than chunk_limit_size will throw an error but I'm not 100%. For the second scenario, I would expect a new chunk to be created.

Also, I found that sometimes the buffer available space ratio is decreasing but the incoming records is not very high.

I saw something similar while using file based buffers (its why I'm now biased against them). After some unknown threshold, the buffers would just continue to grow. I knew that, based on back-of-the-envelope math, that ES could handle the ingestion rate, and fluentd should have been able to flush faster than incoming data rate. The tasks didn't appear to be maxing out cpu, iops, i/o, or exhausting inodes on the ssds. They were in an ASG and load-balancing seemed fine. After changing to memory buffers I never saw this again.

Do you think it is caused by chunk_limit_size 80MB? Is it possible that because there are a lot of large size records to the chunk and not enough 5000 records within 5s?

I can't say.

  • Can you map total incoming records/sec (not by tag/instance) along side those other 2 graphs?
  • Is that Max for all tasks/instances? View it split up by task/instance to see if its isolated or across the board.
  • Same for the buffer available space ratio...

@chikinchoi
Copy link
Author

@renegaderyu

I updated the chunk_limit_size to 20MB and everything seems great!
However still got a lot of "buffer flush took longer time than slow_flush_log_threshold: elapsed_time=....". I found one strange thing that the elapsed_time in error msg mostly is 9xx second. Although I found lot of these error log, seems cannot find delay when receiving log in elasticsearch. Do you think the slow flush error with 9xx second is just a fake alarm?

@renegaderyu
Copy link

@chikinchoi , Its not a fake alarm it just needs to be tuned for your systems. The slow_flush_log_threshold is set to a generic value of 20s, I increased that value after confirming that the ingest delay was acceptable and ES was performing/ingesting fine.

As I understand Fluentd to work, each flush attempt tries to flush the entire buffer, all chunks. In your case, flush_interval 15s. I don't know the rate of incoming data, but, the buffer is comprised of N chunks with size=20 MB. IIRC, each chunk results in a single https bulk request to elasticsearch. N of those requests need to be made in 20s to fully flush the buffer to avoid logging that line. I can't recall but either debug logging on this plugin or enabling the transporter logs allowed me to view all the requests made to ES during a flush and included the metrics like request size, time to complete, and chunks in buffer.

If fluent seems fine and ES seems fine, I'd decrease the flush_interval to 10s. If your systems are near their limits then I'd recommend increasing slow_flush_log_threshold.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants