Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OpenSearchOutputDataStream does not provide error handling #123

Open
1 task
robinsillem opened this issue Nov 20, 2023 · 0 comments
Open
1 task

OpenSearchOutputDataStream does not provide error handling #123

robinsillem opened this issue Nov 20, 2023 · 0 comments

Comments

@robinsillem
Copy link

robinsillem commented Nov 20, 2023

(check apply)

  • [ *] read the contribution guideline
  • (optional) already reported 3rd party upstream repository or mailing list if you use k8s addon or helm charts.

Problem

OpenSearchOutputDataStream does not provide handling for cases where opensearch returns errors in its response to bulk inserts. This behaviour is present in OpenSearchOutput

This is a problem because the offending log message is lost, with no possibility for fluentd to (for instance) send it to some other form of dead letter queue storage, and subsequent offline processing

Steps to replicate

Config
  <match **>
    @type opensearch_data_stream
    @log_level ${log_level}

    host "#{ENV['FLUENT_ELASTICSEARCH_HOST']}"
    port "#{ENV['FLUENT_ELASTICSEARCH_PORT']}"
    user "#{ENV['FLUENT_ELASTICSEARCH_USER']}"
    password "#{ENV['FLUENT_ELASTICSEARCH_PASSWORD']}"
    scheme "#{ENV['FLUENT_ELASTICSEARCH_SCHEME'] || 'http'}"
    ssl_verify "#{ENV['FLUENT_ELASTICSEARCH_SSL_VERIFY'] || 'true'}"

    data_stream_name e.$${tag}.%Y.%W
    include_timestamp true

    default_opensearch_version 2
    fail_on_detecting_os_version_retry_exceed false
    max_retry_get_os_version 2
    verify_os_version_at_startup true

    include_tag_key true
    log_os_400_reason true
    reconnect_on_error true
    reload_after 100
    reload_connections true
    sniffer_class_name Fluent::Plugin::OpenSearchSimpleSniffer
    reload_on_failure true
    request_timeout 60s
    resurrect_after 60s
    suppress_type_name true

    with_transporter_log false # debug flushes

    <buffer tag, time>
        @type file
        timekey 5
        path /var/log/fluent/opensearch
        flush_mode immediate
        retry_type exponential_backoff
        flush_thread_count 12
        retry_forever true
        retry_max_interval 30
        total_limit_size ${fluentd_size}
        overflow_action block
    </buffer>
  </match>

  # This was added to provide DLQ functionality for logs that OS cannot parse (unrecoverable errors), sending them to S3
  # It also catches any events added to the fluentd processing to handle errors (requires emit_error_label_event=true)
  <label @ERROR>
    <match **>
      @type s3
      s3_bucket ${s3_access_point_alias}
      s3_region eu-west-2
      path errors/$${tag}/%Y/%m/%d/
      s3_object_key_format %%{path}%%{time_slice}_%%{index}.%%{file_extension}
      store_as text
      <buffer tag,time>
        @type file
        path /var/log/fluent/s3
        timekey 60 # 1 minute partition
        timekey_wait 10s
        timekey_use_utc true # use utc
      </buffer>
    </match>
  </label>
Message

This is a heavily redacted copy of a log message containing a field which opensearch attempts and fails to parse as a date, leading to a 400 response

{
  "request": {
    "headers": {
      "taxyearexplicit": "2018-19"
    }
  }
}

Expected Behavior or What you need to ask

I expect this message to trigger an error label event which can be picked up in the <label @error> config above and handled within the plugin, allowing us to configure further action as above.

Instead, what we see is a log from fluentd noting that "Could not bulk insert to Data Stream: #{data_stream_name} #{response}", but the original log has been dropped. This means we are losing logs. While in this case the root cause may be considered to lie with opensearch, we still need a general fallback for uprocessable messages.

Using Fluentd and OpenSearch plugin versions

  • OS version: Alpine Linux v3.16 (container image running in EKS)
  • Bare Metal or within Docker or Kubernetes or others? : EKS
  • Fluentd v1.0 or later : fluentd 1.15.2
  • OpenSearch plugin version : 'fluent-plugin-opensearch' version '1.1.4'
  • OpenSearch version (optional)
  • OpenSearch template(s) (optional)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant