Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the esleg bulk response parsing #36275

Merged
merged 20 commits into from
Aug 23, 2023

Conversation

alexsapran
Copy link
Contributor

Proposed commit message

Currently, I am raising this PR in order to start a discussion with potential improvements I found while reviewing some Beat profiles.

While doing some synthetic benchmarks I noticed that we are using the package ioutil.ReadAll which is deprecated. Going down that path to replace I took a deeper look into how we do bulk requests using the esleg package.
I introduced a go benchmark to baseline my changes and here is the comparison between main and the proposed changes

goos: darwin
goarch: arm64
pkg: github.com/elastic/beats/v7/libbeat/esleg/eslegclient
                   │ baseline.txt │   initialize_per_connection.txt   │
                   │    sec/op    │   sec/op     vs base              │
ExecHTTPRequest-12    2.995µ ± 2%   2.905µ ± 1%  -3.01% (p=0.000 n=8)

                   │ baseline.txt │   initialize_per_connection.txt    │
                   │     B/op     │     B/op      vs base              │
ExecHTTPRequest-12   6.055Ki ± 0%   5.555Ki ± 0%  -8.26% (p=0.000 n=8)

                   │ baseline.txt │  initialize_per_connection.txt   │
                   │  allocs/op   │ allocs/op   vs base              │
ExecHTTPRequest-12     27.00 ± 0%   26.00 ± 0%  -3.70% (p=0.000 n=8)

We can see this by replacing the ioutil.ReadAll and adding reusable bytes.Buffer for consuming the response we managed to save on memory allocation.

Additionally, I will be pushing another commit, reducing the overall response we get from the _bulk request which is going to even further reduce the amount of memory we need to copy/allocate for the response.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files
  • I have added tests that prove my fix is effective or that my feature works
  • I have added an entry in CHANGELOG.next.asciidoc or CHANGELOG-developer.next.asciidoc.

Author's Checklist

  • [ ]

How to test this PR locally

Related issues

Use cases

Screenshots

Logs

Signed-off-by: Alexandros, Sapranidis alexandros@elastic.co

This go benchmark is going to serve as our baseline for future changes.

Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
When we create a new connection, we are going to create a new
bytes.Buffer which we are going to use in order to consume the
httpResponse using the io.Copy which produce less memory allocation.

Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
@alexsapran alexsapran added enhancement backport-skip Skip notification from the automated backport with mergify labels Aug 9, 2023
@alexsapran alexsapran self-assigned this Aug 9, 2023
@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Aug 9, 2023
@botelastic
Copy link

botelastic bot commented Aug 9, 2023

This pull request doesn't have a Team:<team> label.

@elasticmachine
Copy link
Collaborator

elasticmachine commented Aug 9, 2023

💚 Build Succeeded

the below badges are clickable and redirect to their specific view in the CI or DOCS
Pipeline View Test View Changes Artifacts preview preview

Expand to view the summary

Build stats

  • Start Time: 2023-08-22T14:29:27.262+0000

  • Duration: 70 min 6 sec

Test stats 🧪

Test Results
Failed 0
Passed 28075
Skipped 2015
Total 30090

💚 Flaky test report

Tests succeeded.

🤖 GitHub comments

Expand to view the GitHub comments

To re-run your PR in the CI, just comment with:

  • /test : Re-trigger the build.

  • /package : Generate the packages and run the E2E tests.

  • /beats-tester : Run the installation tests with beats-tester.

  • run elasticsearch-ci/docs : Re-trigger the docs validation. (use unformatted text in the comment!)

Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
This commit adds a new parameter in the output settings called
`bulk_response_filtering` which by default is going to be true.

This parameter when is true is going to append the parameter of the
ResponseFiltering to the Bulk request the beat is doing.
We are adding this parameter so that we can opt-out if we want to
diagnose a problem or if someone want to keep using the older way we
execute bulk requests.

Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
This commit address the lint error for the unhandled error not been
checked.

Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
This is to address the linting rule that triggered the following
violation.

```
should rewrite http.NewRequestWithContext or add (*Request).WithContext (noctx)
```

Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
@alexsapran alexsapran marked this pull request as ready for review August 16, 2023 17:00
@alexsapran alexsapran requested a review from a team as a code owner August 16, 2023 17:00
@ycombinator
Copy link
Contributor

@leehinman Will this improvement automatically benefit the shipper or does the same change need to be made elsewhere for the shipper as well?

@leehinman
Copy link
Contributor

@leehinman Will this improvement automatically benefit the shipper or does the same change need to be made elsewhere for the shipper as well?

should be automatic, when filebeat is acting as an elasticsearch shipper it is using the 'elasticsearch' output.

@alexsapran
Copy link
Contributor Author

alexsapran commented Aug 17, 2023

@ycombinator

does the same change need to be made elsewhere for the shipper as well?

A similar change needs to be made for other parts of the codebase that is using the ioutil.ReadAll which uses it to consume the response and allocate more memory than for example the io.copy

Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
this complete the migration of this Bulkfiltering from the esleg to the
output package

Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
Since all tests before this change are using the filtered response and
they were passing there is no need for another flag. This change
actually make the tests reflect closer to the actual behaviour.

Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
}

conn := Connection{
ConnectionSettings: s,
HTTP: esClient,
Encoder: encoder,
log: logger,
responseBuffer: bytes.NewBuffer(nil),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this field needs to be protected by a mutex.

Let's say two goroutines, G1 and G2 are running concurrently, and both end up calling conn.execHTTPRequest at some point in their execution. Now imagine this sequence of events:

  1. G1 copies resp.Body into conn.responseBuffer (line 458).
  2. G2 calls conn.responseBuffer.Reset() (line 457).
  3. G1 returns conn.responseBuffer.Bytes() (line 468).

The result will be that G1 will return an empty slice of bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You bring a valid concern here. Before going in and adding a mutex which is potentially defeating what we want to achieve and actually slowing down the process.
I might be wrong, but this is the assumption I had, each go routine would have its own connection. One connection would not be shared between 2 go routines, if 2 goroutines needed a connection to the ES they would create a new connection client thus a new buffer.
If we absolutely need to add a mutex, I would rather change the implementation because if there is a case that 2 go routines shared the same connection then adding a mutex would be similar to not having go routines in the first place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right that as it stands today the only way two goroutines could concurrently call the execHTTPRequest method is if they were sharing the same Connection object. And from what I can tell, that is not happening anywhere in the code.

Besides checking this by eyeballing the code paths that lead to the execHTTPRequest method, I also ran go test -race ./libbeat/... -count 1 | grep -i 'data race' | wc -l from the root folder of the Beats repo, first in the main branch and then in this PR's branch. In both cases I got the same results, which is an quick sanity check that the changes in this PR are most likely not introducing additional data races:

Running in main

$ go test -race ./libbeat/... -count 1 | grep -i 'data race' | wc -l
      22

Running in this PR's branch

$ go test -race ./libbeat/... -count 1 | grep -i 'data race' | wc -l
      22

Side note: for the data races that are being reported on main, I filed #36393.


However, in the future, code may be written such that two concurrently-executing goroutines are sharing the same Connection object.

The safest thing to do here would be to add a mutex, but as you noted, this will likely cut into the performance gains we are seeing from the change in this PR.

The next best thing to do, IMO, would be to document clearly in a comment above the execHTTPRequest method AND in comments above each of the constructor functions that return a new ConnectionNewConnection, NewClients, and NewConnectedClient — AND in a comment above the Connection struct itself that this method/struct is not threadsafe. You could add a link in your comments to this discussion here on GitHub in case someone wants to know all the details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ for the help here, I agree that race conditions are something to be looked-at at in different work streams.

However, in the future, code may be written such that two concurrently-executing goroutines are sharing the same Connection object.

I think hope that once we move away from the esleg and to a different output client, this part of the code should not be used anymore.

I added some comment about the thread-safety, please take a look at f181725

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments LGTM. Thanks @alexsapran!

Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
Signed-off-by: Alexandros, Sapranidis <alexandros@elastic.co>
Copy link
Contributor

@ycombinator ycombinator left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Nice performance improvements!

@alexsapran alexsapran merged commit 1a7e9aa into elastic:main Aug 23, 2023
86 checks passed
Scholar-Li pushed a commit to Scholar-Li/beats that referenced this pull request Feb 5, 2024
Comment on lines +462 to +463
conn.responseBuffer.Reset()
_, err = io.Copy(conn.responseBuffer, resp.Body)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexsapran I think we should not re-use the buffer here because the size is not static. This buffer will never release its allocated memory and the size of this memory will be equal to the size of the biggest ever processed HTTP response. I'd rather prefer to release the memory and re-allocate smaller buffers when needed.

I think we should switch it in favour of elastic/elastic-agent-libs#183 and compare performance. This implementation should be the most optimized one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are correct @rdner to the use case you mention, but this might be just fine in this particular scenario because the output is relational to the batching we do.

So the response size is within a ballpark the same for each bulk we publish.
We can adjust this and make it so that we only allocate a buffer just for the consumption of that response.
Since we are in the process of evaluating other usages of the ioutil.ReadAll it might make sense to adjust this as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport-skip Skip notification from the automated backport with mergify enhancement needs_team Indicates that the issue/PR needs a Team:* label
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants