Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure parser returns hash #4474

Merged
merged 4 commits into from Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/fluent/plugin/filter_parser.rb
Expand Up @@ -79,6 +79,10 @@ def filter_with_time(tag, time, record)
end
@accessor.delete(record) if @remove_key_name_field
r = handle_parsed(tag, record, t, values)
# Note: https://github.com/fluent/fluentd/issues/4100
# If the parser returns multiple records from one raw_value,
# this returns only the first one record.
# This should be fixed in the future version.
return t, r
else
if @emit_invalid_record_to_error
Expand Down
67 changes: 15 additions & 52 deletions lib/fluent/plugin/in_http.rb
Expand Up @@ -203,54 +203,24 @@ def on_request(path_info, params)
begin
path = path_info[1..-1] # remove /
tag = path.split('/').join('.')
record_time, record = parse_params(params)

# Skip nil record
if record.nil?
log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
if @respond_with_empty_img
return RESPONSE_IMG
else
if @use_204_response
return RESPONSE_204
else
return RESPONSE_200
end
mes = Fluent::MultiEventStream.new
parse_params(params) do |record_time, record|
if record.nil?
log.debug { "incoming event is invalid: path=#{path_info} params=#{params.to_json}" }
next
end
end

mes = nil
# Support batched requests
if record.is_a?(Array)
mes = Fluent::MultiEventStream.new
record.each do |single_record|
add_params_to_record(single_record, params)

if param_time = params['time']
param_time = param_time.to_f
single_time = param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
elsif @custom_parser
single_time = @custom_parser.parse_time(single_record)
single_time, single_record = @custom_parser.convert_values(single_time, single_record)
else
single_time = convert_time_field(single_record)
end

mes.add(single_time, single_record)
end
else
add_params_to_record(record, params)

time = if param_time = params['time']
param_time = param_time.to_f
param_time.zero? ? Fluent::EventTime.now : @float_time_parser.parse(param_time)
else
if record_time.nil?
convert_time_field(record)
else
record_time
end
record_time.nil? ? convert_time_field(record) : record_time
end

mes.add(time, record)
end
rescue => e
if @dump_error_log
Expand All @@ -261,11 +231,7 @@ def on_request(path_info, params)

# TODO server error
begin
if mes
router.emit_stream(tag, mes)
else
router.emit(tag, time, record)
end
router.emit_stream(tag, mes) unless mes.empty?
rescue => e
if @dump_error_log
log.error "failed to emit data", error: e
Expand Down Expand Up @@ -308,31 +274,28 @@ def on_server_connect(conn)
def parse_params_default(params)
if msgpack = params['msgpack']
@parser_msgpack.parse(msgpack) do |_time, record|
return nil, record
yield nil, record
end
elsif js = params['json']
@parser_json.parse(js) do |_time, record|
return nil, record
yield nil, record
end
elsif ndjson = params['ndjson']
events = []
ndjson.split(/\r?\n/).each do |js|
@parser_json.parse(js) do |_time, record|
events.push(record)
yield nil, record
end
end
return nil, events
else
raise "'json', 'ndjson' or 'msgpack' parameter is required"
end
end

def parse_params_with_parser(params)
if content = params[EVENT_RECORD_PARAMETER]
@custom_parser.parse(content) { |time, record|
raise "Received event is not #{@format_name}: #{content}" if record.nil?
return time, record
}
@custom_parser.parse(content) do |time, record|
yield time, record
end
else
raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
end
Expand Down
27 changes: 22 additions & 5 deletions lib/fluent/plugin/parser_json.rb
Expand Up @@ -70,16 +70,33 @@ def configure_json_parser(name)
end

def parse(text)
record = @load_proc.call(text)
time = parse_time(record)
if @execute_convert_values
ashie marked this conversation as resolved.
Show resolved Hide resolved
time, record = convert_values(time, record)
parsed_json = @load_proc.call(text)

if parsed_json.is_a?(Hash)
time, record = parse_one_record(parsed_json)
yield time, record
elsif parsed_json.is_a?(Array)
parsed_json.each do |record|
unless record.is_a?(Hash)
yield nil, nil
next
end
time, parsed_record = parse_one_record(record)
yield time, parsed_record
end
else
yield nil, nil
end
yield time, record

rescue @error_class, EncodingError # EncodingError is for oj 3.x or later
yield nil, nil
end

def parse_one_record(record)
time = parse_time(record)
convert_values(time, record)
end

def parser_type
:text
end
Expand Down
27 changes: 24 additions & 3 deletions lib/fluent/plugin/parser_msgpack.rb
Expand Up @@ -31,18 +31,39 @@ def parser_type
:binary
end

def parse(data)
def parse(data, &block)
@unpacker.feed_each(data) do |obj|
yield convert_values(parse_time(obj), obj)
parse_unpacked_data(obj, &block)
end
end
alias parse_partial_data parse

def parse_io(io, &block)
u = Fluent::MessagePackFactory.engine_factory.unpacker(io)
u.each do |obj|
time, record = convert_values(parse_time(obj), obj)
parse_unpacked_data(obj, &block)
end
end

def parse_unpacked_data(data)
if data.is_a?(Hash)
time, record = convert_values(parse_time(data), data)
yield time, record
return
end

unless data.is_a?(Array)
yield nil, nil
return
end

data.each do |record|
unless record.is_a?(Hash)
yield nil, nil
next
end
time, converted_record = convert_values(parse_time(record), record)
yield time, converted_record
end
end
end
Expand Down
106 changes: 106 additions & 0 deletions test/plugin/test_parser_json.rb
Expand Up @@ -135,4 +135,110 @@ def test_yajl_parse_io_with_buffer_smaller_than_input
end
end
end

sub_test_case "various record pattern" do
data("Only string", { record: '"message"', expected: [nil] }, keep: true)
data("Only string without quotation", { record: "message", expected: [nil] }, keep: true)
data("Only number", { record: "0", expected: [nil] }, keep: true)
data(
"Array of Hash",
{
record: '[{"k1": 1}, {"k2": 2}]',
expected: [{"k1" => 1}, {"k2" => 2}]
},
keep: true,
)
data(
"Array of both Hash and invalid",
{
record: '[{"k1": 1}, "string", {"k2": 2}, 0]',
expected: [{"k1" => 1}, nil, {"k2" => 2}, nil]
},
keep: true,
)
data(
"Array of all invalid",
{
record: '["string", 0, [{"k": 0}]]',
expected: [nil, nil, nil]
},
keep: true,
)

def test_oj(data)
parsed_records = []
@parser.configure("json_parser" => "oj")
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end

def test_yajl(data)
parsed_records = []
@parser.configure("json_parser" => "yajl")
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end

def test_json(json)
parsed_records = []
@parser.configure("json_parser" => "json")
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end
end

# This becomes NoMethodError if a non-Hash object is passed to convert_values.
# https://github.com/fluent/fluentd/issues/4100
sub_test_case "execute_convert_values with null_empty_string" do
data("Only string", { record: '"message"', expected: [nil] }, keep: true)
data(
"Hash",
{
record: '{"k1": 1, "k2": ""}',
expected: [{"k1" => 1, "k2" => nil}]
},
keep: true,
)
data(
"Array of Hash",
{
record: '[{"k1": 1}, {"k2": ""}]',
expected: [{"k1" => 1}, {"k2" => nil}]
},
keep: true,
)

def test_oj(data)
parsed_records = []
@parser.configure("json_parser" => "oj", "null_empty_string" => true)
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end

def test_yajl(data)
parsed_records = []
@parser.configure("json_parser" => "yajl", "null_empty_string" => true)
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end

def test_json(json)
parsed_records = []
@parser.configure("json_parser" => "json", "null_empty_string" => true)
@parser.instance.parse(data[:record]) { |time, record|
parsed_records.append(record)
}
assert_equal(data[:expected], parsed_records)
end
end
end