Skip to content

Commit

Permalink
fix 208. Properly handle unix time value (#275)
Browse files Browse the repository at this point in the history
  • Loading branch information
jcantrill committed Jan 25, 2021
1 parent a4449e0 commit 612a5c7
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 15 deletions.
17 changes: 2 additions & 15 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Expand Up @@ -22,6 +22,7 @@
require_relative 'kubernetes_metadata_cache_strategy'
require_relative 'kubernetes_metadata_common'
require_relative 'kubernetes_metadata_stats'
require_relative 'kubernetes_metadata_util'
require_relative 'kubernetes_metadata_watch_namespaces'
require_relative 'kubernetes_metadata_watch_pods'

Expand All @@ -35,6 +36,7 @@ class KubernetesMetadataFilter < Fluent::Plugin::Filter

include KubernetesMetadata::CacheStrategy
include KubernetesMetadata::Common
include KubernetesMetadata::Util
include KubernetesMetadata::WatchNamespaces
include KubernetesMetadata::WatchPods

Expand Down Expand Up @@ -294,21 +296,6 @@ def get_metadata_for_record(namespace_name, pod_name, container_name, container_
metadata
end

def create_time_from_record(record, internal_time)
time_key = @time_fields.detect { |ii| record.key?(ii) }
time = record[time_key]
if time.nil? || time.chop.empty?
# `internal_time` is a Fluent::EventTime, it can't compare with Time.
return Time.at(internal_time.to_f)
end

if ['_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP'].include?(time_key)
timei = time.to_i
return Time.at(timei / 1_000_000, timei % 1_000_000)
end
Time.parse(time)
end

def filter_stream(tag, es)
return es if (es.respond_to?(:empty?) && es.empty?) || !es.is_a?(Fluent::EventStream)

Expand Down
40 changes: 40 additions & 0 deletions lib/fluent/plugin/kubernetes_metadata_util.rb
@@ -0,0 +1,40 @@
# frozen_string_literal: true

#
# Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with
# Kubernetes metadata
#
# Copyright 2021 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module KubernetesMetadata
module Util
def create_time_from_record(record, internal_time)
time_key = @time_fields.detect { |ii| record.key?(ii) }
time = record[time_key]
if time.nil? || time.is_a?(String) && time.chop.empty?
# `internal_time` is a Fluent::EventTime, it can't compare with Time.
return Time.at(internal_time.to_f)
end

if ['_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP'].include?(time_key)
timei = time.to_i
return Time.at(timei / 1_000_000, timei % 1_000_000)
end
return Time.at(time) if time.is_a?(Numeric)

Time.parse(time)
end
end
end
56 changes: 56 additions & 0 deletions test/plugin/test_utils.rb
@@ -0,0 +1,56 @@
# frozen_string_literal: true

#
# Fluentd Kubernetes Metadata Filter Plugin - Enrich Fluentd events with
# Kubernetes metadata
#
# Copyright 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
class KubernetesMetadataCacheStatsTest < Test::Unit::TestCase
include KubernetesMetadata::Util

def setup
@time_fields = ['time']
@internal_time = Time.now
end

test '#create_time_from_record when time is empty' do
record = { 'time' => ' ' }
assert_equal(@internal_time.to_i, create_time_from_record(record, @internal_time).to_i)
end
test '#create_time_from_record when time is nil' do
record = {}
assert_equal(@internal_time.to_i, create_time_from_record(record, @internal_time).to_i)
end

test '#create_time_from_record when time is an integer' do
exp_time = Time.now
record = { 'time' => exp_time.to_i }
assert_equal(exp_time.to_i, create_time_from_record(record, @internal_time).to_i)
end

test '#create_time_from_record when time is a string' do
exp_time = Time.now
record = { 'time' => exp_time.to_s }
assert_equal(exp_time.to_i, create_time_from_record(record, @internal_time).to_i)
end

test '#create_time_from_record when timefields include journal time fields' do
@time_fields = ['_SOURCE_REALTIME_TIMESTAMP']
exp_time = Time.now
record = { '_SOURCE_REALTIME_TIMESTAMP' => exp_time.to_i.to_s }
assert_equal(Time.at(exp_time.to_i / 1_000_000, exp_time.to_i % 1_000_000).to_i, create_time_from_record(record, @internal_time).to_i)
end
end

0 comments on commit 612a5c7

Please sign in to comment.