Skip to content

Commit

Permalink
Don't ack pings twice (grpc#1534)
Browse files Browse the repository at this point in the history
Motivation:

gRPC Swift is emitting two acks per ping. NIOHTTP2 is emitting one and
the keepalive handler is emitting the other.

Modifications:

- Don't emit ping acks from the keep alive handler; just let the H2
  handler do it.

Result:

- No unnecessary ping acks are emitted.
- Resolves grpc#1520
  • Loading branch information
glbrntt authored and WendellXY committed Aug 24, 2023
1 parent 83e689a commit 2555968
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 34 deletions.
4 changes: 4 additions & 0 deletions Sources/GRPC/GRPCIdleHandler.swift
Expand Up @@ -184,6 +184,10 @@ internal final class GRPCIdleHandler: ChannelInboundHandler {
case .none:
()

case .ack:
// NIO's HTTP2 handler acks for us so this is a no-op.
()

case .cancelScheduledTimeout:
self.scheduledClose?.cancel()
self.scheduledClose = nil
Expand Down
12 changes: 6 additions & 6 deletions Sources/GRPC/GRPCKeepaliveHandlers.swift
Expand Up @@ -90,6 +90,7 @@ struct PingHandler {

enum Action {
case none
case ack
case schedulePing(delay: TimeAmount, timeout: TimeAmount)
case cancelScheduledTimeout
case reply(HTTP2Frame.FramePayload)
Expand Down Expand Up @@ -170,35 +171,34 @@ struct PingHandler {
// This is a valid ping, reset our strike count and reply with a pong.
self.pingStrikes = 0
self.lastReceivedPingDate = self.now()
return .reply(self.generatePingFrame(data: pingData, ack: true))
return .ack
}
} else {
// We don't support ping strikes. We'll just reply with a pong.
//
// Note: we don't need to update `pingStrikes` or `lastReceivedPingDate` as we don't
// support ping strikes.
return .reply(self.generatePingFrame(data: pingData, ack: true))
return .ack
}
}

mutating func pingFired() -> Action {
if self.shouldBlockPing {
return .none
} else {
return .reply(self.generatePingFrame(data: self.pingData, ack: false))
return .reply(self.generatePingFrame(data: self.pingData))
}
}

private mutating func generatePingFrame(
data: HTTP2PingData,
ack: Bool
data: HTTP2PingData
) -> HTTP2Frame.FramePayload {
if self.activeStreams == 0 {
self.sentPingsWithoutData += 1
}

self.lastSentPingDate = self.now()
return HTTP2Frame.FramePayload.ping(data, ack: ack)
return HTTP2Frame.FramePayload.ping(data, ack: false)
}

/// Returns true if, on receipt of a ping, the ping should be regarded as a ping strike.
Expand Down
124 changes: 96 additions & 28 deletions Tests/GRPCTests/GRPCPingHandlerTests.swift
Expand Up @@ -15,6 +15,7 @@
*/
@testable import GRPC
import NIOCore
import NIOEmbedded
import NIOHTTP2
import XCTest

Expand Down Expand Up @@ -249,35 +250,23 @@ class GRPCPingHandlerTests: GRPCTestCase {
pingData: HTTP2PingData(withInteger: 1),
ack: false
)
XCTAssertEqual(
response,
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
)
XCTAssertEqual(response, .ack)

// Received another ping, response should be a pong (ping strikes not in effect)
response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
XCTAssertEqual(
response,
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
)
XCTAssertEqual(response, .ack)

// Received another ping, response should be a pong (ping strikes not in effect)
response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
XCTAssertEqual(
response,
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
)
XCTAssertEqual(response, .ack)
}

func testPingWithoutDataResultsInPongForClient() {
// Don't allow _sending_ pings when no calls are active (receiving pings should be tolerated).
self.setupPingHandler(permitWithoutCalls: false)

let action = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
XCTAssertEqual(
action,
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
)
XCTAssertEqual(action, .ack)
}

func testPingWithoutDataResultsInPongForServer() {
Expand All @@ -291,10 +280,7 @@ class GRPCPingHandlerTests: GRPCTestCase {
)

let action = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
XCTAssertEqual(
action,
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
)
XCTAssertEqual(action, .ack)
}

func testPingStrikesOnServer() {
Expand All @@ -312,10 +298,7 @@ class GRPCPingHandlerTests: GRPCTestCase {
pingData: HTTP2PingData(withInteger: 1),
ack: false
)
XCTAssertEqual(
response,
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
)
XCTAssertEqual(response, .ack)

// Received another ping, which is invalid (ping strike), response should be no action
response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
Expand All @@ -326,10 +309,7 @@ class GRPCPingHandlerTests: GRPCTestCase {

// Received another ping, which is valid now, response should be a pong
response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
XCTAssertEqual(
response,
.reply(HTTP2Frame.FramePayload.ping(HTTP2PingData(withInteger: 1), ack: true))
)
XCTAssertEqual(response, .ack)

// Received another ping, which is invalid (ping strike), response should be no action
response = self.pingHandler.read(pingData: HTTP2PingData(withInteger: 1), ack: false)
Expand Down Expand Up @@ -381,6 +361,8 @@ extension PingHandler.Action: Equatable {
switch (lhs, rhs) {
case (.none, .none):
return true
case (.ack, .ack):
return true
case (let .schedulePing(lhsDelay, lhsTimeout), let .schedulePing(rhsDelay, rhsTimeout)):
return lhsDelay == rhsDelay && lhsTimeout == rhsTimeout
case (.cancelScheduledTimeout, .cancelScheduledTimeout):
Expand All @@ -401,3 +383,89 @@ extension PingHandler.Action: Equatable {
}
}
}

extension GRPCPingHandlerTests {
func testSingleAckIsEmittedOnPing() throws {
let client = EmbeddedChannel()
let _ = try client.configureHTTP2Pipeline(mode: .client) { _ in
fatalError("Unexpected inbound stream")
}.wait()

let server = EmbeddedChannel()
let serverMux = try server.configureHTTP2Pipeline(mode: .server) { _ in
fatalError("Unexpected inbound stream")
}.wait()

let idleHandler = GRPCIdleHandler(
idleTimeout: .minutes(5),
keepalive: .init(),
logger: self.serverLogger
)
try server.pipeline.syncOperations.addHandler(idleHandler, position: .before(serverMux))
try server.connect(to: .init(unixDomainSocketPath: "/ignored")).wait()
try client.connect(to: .init(unixDomainSocketPath: "/ignored")).wait()

func interact(client: EmbeddedChannel, server: EmbeddedChannel) throws {
var didRead = true
while didRead {
didRead = false

if let data = try client.readOutbound(as: ByteBuffer.self) {
didRead = true
try server.writeInbound(data)
}

if let data = try server.readOutbound(as: ByteBuffer.self) {
didRead = true
try client.writeInbound(data)
}
}
}

try interact(client: client, server: server)

// Settings.
let f1 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self))
f1.payload.assertSettings(ack: false)

// Settings ack.
let f2 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self))
f2.payload.assertSettings(ack: true)

// Send a ping.
let ping = HTTP2Frame(streamID: .rootStream, payload: .ping(.init(withInteger: 42), ack: false))
try client.writeOutbound(ping)
try interact(client: client, server: server)

// Ping ack.
let f3 = try XCTUnwrap(client.readInbound(as: HTTP2Frame.self))
f3.payload.assertPing(ack: true)

XCTAssertNil(try client.readInbound(as: HTTP2Frame.self))
}
}

extension HTTP2Frame.FramePayload {
func assertSettings(ack: Bool, file: StaticString = #file, line: UInt = #line) {
switch self {
case let .settings(settings):
switch settings {
case .ack:
XCTAssertTrue(ack, file: file, line: line)
case .settings:
XCTAssertFalse(ack, file: file, line: line)
}
default:
XCTFail("Expected .settings got \(self)", file: file, line: line)
}
}

func assertPing(ack: Bool, file: StaticString = #file, line: UInt = #line) {
switch self {
case let .ping(_, ack: pingAck):
XCTAssertEqual(pingAck, ack, file: file, line: line)
default:
XCTFail("Expected .ping got \(self)", file: file, line: line)
}
}
}

0 comments on commit 2555968

Please sign in to comment.