Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sources): Add http source using Warp #1650

Merged
merged 16 commits into from
Mar 2, 2020

Conversation

gedkins
Copy link
Contributor

@gedkins gedkins commented Jan 31, 2020

This addresses #328.

This is based off the logplex source. Code that may be reused by other http source plugins has been separated out into a util/http.rs.

Supports text, json and ndjson body.

Signed-off-by: Giles Edkins gedkins@bluecatnetworks.com

This is based off the logplex source. Code that may be reused by other http source plugins has been separated out into a util/http.rs.

Supports text, json and ndjson body.

Signed-off-by: Giles Edkins <gedkins@bluecatnetworks.com>
@leshow
Copy link
Contributor

leshow commented Jan 31, 2020

Hey! I got a notification for this, I hope you don't mind if I nitpick a few random things.

src/sources/http.rs Outdated Show resolved Hide resolved
src/sources/http.rs Outdated Show resolved Hide resolved
fn json_parse_array_of_object(value: JsonValue) -> Result<Vec<Event>, ErrorMessage> {
match value {
JsonValue::Array(v) => {
let mut out_vec = vec![];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        JsonValue::Array(v) => Ok(v
            .into_iter()
            .map(json_parse_object)
            .collect::<Result<Vec<Event>, _>>()?),

There's a FromIterator<Result<A, E>> that makes this collect() call work. Kinda like traverse from Haskell. I didn't run any tests but I think this says the same thing.

@binarylogic
Copy link
Contributor

Thanks! @lukesteensen is the best person to review this since he built the logplex source.

Signed-off-by: Giles Edkins <gedkins@bluecatnetworks.com>
@binarylogic binarylogic requested review from Hoverbear and removed request for lukesteensen February 3, 2020 21:01
@binarylogic binarylogic assigned Hoverbear and unassigned lukesteensen Feb 3, 2020
}

fn json_error() -> ErrorMessage {
ErrorMessage::new(StatusCode::BAD_REQUEST, "Bad JSON".to_string())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should support some error context? Eg "Expected Array or Object, got Number"

}

fn json_parse(json_str: String) -> Result<JsonValue, ErrorMessage> {
serde_json::from_str(&json_str).map_err(|_| json_error())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this function's purpose warrants it's existence. If all it does is throw away error context from from_str maybe we can just remove it entirely?

src/sources/http.rs Outdated Show resolved Hide resolved
)
.and_then(|events| {
out.send_all(futures::stream::iter_ok(events)).map_err(
move |_: mpsc::SendError<Event>| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any context we can get from this error? It'd be sad to throw it away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We sort of can get more context.
We can log the event it was trying to send when it failed (which is a good idea), but the only reason it would fail is if the receiver is dropped, and the error doesn't give any more insight than that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, fair!

@Hoverbear
Copy link
Contributor

Once you're feeling this is ready to go code-wise we should also add documentation! You can see what that would look like here: https://github.com/timberio/vector/pull/1264/files#diff-d97eef340a70cc48bfd4c26e66a12780

Bill Bateman added 2 commits February 4, 2020 16:02
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
@gedkins gedkins changed the title feat(http source): add http source using Warp feat(http source): Add http source using Warp Feb 5, 2020
@gedkins gedkins changed the title feat(http source): Add http source using Warp feat(sources): Add http source using Warp Feb 5, 2020
@binarylogic binarylogic added the needs: docs Needs documentation updates label Feb 9, 2020
@bill-bateman
Copy link
Contributor

Just checking in if you are waiting on anything from us.

Is the documentation that we added enough?

@Hoverbear
Copy link
Contributor

@bill-bateman Somehow this notification got lost, I'm sorry about the delay. :(

@bill-bateman
Copy link
Contributor

@Hoverbear no worries!

@Hoverbear
Copy link
Contributor

@LucioFranco I'm asking you to give a second set of eyes to this since I feel like it has some big picture ideas involved I might be yet familiar with.

.meta/sources/http.toml Outdated Show resolved Hide resolved

fn string_to_static_str(s: String) -> &'static str {
//necessary because warp 0.1.18 needs a &'static str for the path
Box::leak(s.into_boxed_str())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is safe, and it definitely looks like a memory leak. https://doc.rust-lang.org/std/boxed/struct.Box.html#method.leak

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's kinda gross. Couldn't see a way around this if we wanted the path to come from the config, since the version of warp needs &'static str.

If there's a way around this that I don't know then I'd love to know it.

Or if it's more important to not have the leak, we could just pass in a &'static str and have it be hard-coded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to see it hard coded until we upgrade warp to 0.2.x, since this problem only exists for us in 0.1.x. :)

If we're listening on a socket, I suggest just listen to everything on that socket. The path could be part of the event maybe? @binarylogic 's spec in #328 didn't mention anything about being able to set paths.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK! - I think I'll just make it hard-coded for now, since it doesn't seem to be needed.

You're right, there is to be a way to get the full-path after the fact (see here) but they say

should probably not be used for request matching/routing.

so not sure about that, better I think to have the hard-coded path and worry about this when someone wants a configurable path (by which point we'll hopefully be on warp 0.2.x!)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for now I think removing path support is fine 👍

@Hoverbear
Copy link
Contributor

@gedkins Can you ammend that commit with a DCO sign? git commit --amend -s

gedkins and others added 2 commits February 13, 2020 18:04
Co-Authored-By: Ana Hobden <operator@hoverbear.org>
Signed-off-by: Giles Edkins <gedkins@bluecatnetworks.com>
Signed-off-by: Giles Edkins <gedkins@bluecatnetworks.com>
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
let trigger = trigger.clone();
info!("Handling http request: {:?}", headers);

futures::future::result(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw if you bring in the IntoFuture trait then you can do Result::into_future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nice didn't know about that

pub trait HttpSource: Clone + Send + Sync + 'static {
fn build_event(
&self,
body: impl Buf,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we don't just make this FullBody?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FullBody doesn't seem to exist in 0.2.x versions of warp so I just didn't use it.

0.1.20 (our version) vs 0.2.1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right it should exist since we are using 0.1, right?

}
let svc = filter
.and(warp::path::end())
.boxed()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need to be boxed here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't need to be


fn string_to_static_str(s: String) -> &'static str {
//necessary because warp 0.1.18 needs a &'static str for the path
Box::leak(s.into_boxed_str())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for now I think removing path support is fine 👍

Err(r)
}
};
futures::future::result(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be able to use into_future here too

.filter(|s| !s.is_empty())
}

fn decode_body(body: impl Buf, enc: Encoding) -> Result<Vec<Event>, ErrorMessage> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think it may be a bit better here to do something like:

let body = buf.collect::<BytesMut>();

then for the encodings that need them in lines format we can take advantage of https://github.com/timberio/vector/blob/master/lib/codec/src/lib.rs#L18

then for the json encoding we can do something like:

serde_json::from_slice(&body[..])

This would avoid some additional copying

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok sounds good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new commit is mostly for this. lots of changes to the body_to_lines function, which is now not the prettiest... let me know what you think of it, if there are better ways to do what I'm doing.

one issue I had is that the codec won't give the last line if it doesn't end in a newline, so I have a special case for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our codec that we wrote should handle that, you may want to call decode_eof though instead which should handle the last line.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh perfect! Thanks :)

Bill Bateman added 2 commits February 14, 2020 11:37
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
Signed-off-by: Bill Bateman <bbateman@bluecatnetworks.com>
Signed-off-by: Ana Hobden <operator@hoverbear.org>
Signed-off-by: Ana Hobden <operator@hoverbear.org>
Signed-off-by: Ana Hobden <operator@hoverbear.org>
Signed-off-by: Ana Hobden <operator@hoverbear.org>
@Hoverbear Hoverbear merged commit 1944ae9 into vectordotdev:master Mar 2, 2020
@Hoverbear
Copy link
Contributor

@gedkins Thanks so much for this contribution. You rock!

@binarylogic binarylogic mentioned this pull request Mar 9, 2020
3 tasks
@thameezb
Copy link

Silly question, looking at the code (and the underlying packages) this looks fully capable of listening on IPv6 addresses (note that I am not a rust dev). However during usage we get errors when trying to listen on both ::PORT and [::]:PORT

Is it possible to bind to the IPv6 global address?

@jszwedko
Copy link
Member

jszwedko commented Aug 10, 2023

Silly question, looking at the code (and the underlying packages) this looks fully capable of listening on IPv6 addresses (note that I am not a rust dev). However during usage we get errors when trying to listen on both ::PORT and [::]:PORT

Is it possible to bind to the IPv6 global address?

I responded to this over here: #2508 (comment) . It should be possible to bind to [::]:<port>.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs: docs Needs documentation updates
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

9 participants