Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support extra listeners on NLBs #529

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ Overview of configuration which can be set via Ingress annotations.
|`zalando.org/aws-load-balancer-ssl-policy`|`string`|`ELBSecurityPolicy-2016-08`|
|`zalando.org/aws-load-balancer-type`| `nlb` \| `alb`|`alb`|
|`zalando.org/aws-load-balancer-http2`| `true` \| `false`|`true`|
|[`zalando.org/aws-nlb-extra-listeners`](#extra-listen-ports)|`string`|N/A|
|`zalando.org/aws-waf-web-acl-id` | `string` | N/A |
|`kubernetes.io/ingress.class`|`string`|N/A|

Expand Down Expand Up @@ -709,6 +710,29 @@ being managed through a target group type is `ip`, which means there is no neces
| `AWSCNI` | `false` | `true` | PodIP != HostIP: limited scaling and host bound |
| `AWSCNI` | `false` | `false` | free scaling, pod VPC CNI IP used |

## Advanced Options for NLBs

### Extra Listen Ports

Some real world scenarios may require non-standard TCP or UDP ingresses. The `zalando.org/aws-nlb-extra-listeners`
annotation allows you to specify a list of additional listeners to add to your NLB. The value of the annotation should
be a valid JSON string of the following format.

```json
[
{
"protocol": "TCP",
"listenport": 22,
"targetport": 2222,
"podlabel": "application=ssh-service"
}
]
```

The `podlabel` value is used to register targets in the target group associated with the listener. This depends on the
AWS CNI Mode feature, where individual pods receive accessible IP addresses. The value is used to identify pods running
in the same namespace as the ingress that will receive traffic from the load balancer.
jhuntwork marked this conversation as resolved.
Show resolved Hide resolved

## Trying it out

The Ingress Controller's responsibility is limited to managing load balancers, as described above. To have a fully
Expand Down
113 changes: 85 additions & 28 deletions aws/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,19 @@ type Adapter struct {

type TargetCNIconfig struct {
Enabled bool
TargetGroupCh chan []string
TargetGroupCh chan []TargetGroupWithLabels
}

type TargetGroupWithLabels struct {
ARN string
PodNamespace string
PodLabel string
}
type CNIEndpoint struct {
IPAddress string
Namespace string
Podlabel string
}
type manifest struct {
securityGroup *securityGroupDetails
instance *instanceDetails
Expand Down Expand Up @@ -250,7 +260,7 @@ func NewAdapter(clusterID, newControllerID, vpcID string, debug, disableInstrume
customFilter: DefaultCustomFilter,
TargetCNI: &TargetCNIconfig{
Enabled: false,
TargetGroupCh: make(chan []string, 10),
TargetGroupCh: make(chan []TargetGroupWithLabels, 10),
},
}

Expand Down Expand Up @@ -655,8 +665,33 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
a.TargetCNI.TargetGroupCh <- targetTypesARNs[elbv2.TargetTypeEnumIp]
}

// run through any target groups with ALB targets and register all ALBs
for _, tg := range targetTypesARNs[elbv2.TargetTypeEnumAlb] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's about NLBs?

albARNs := make([]string, 0, len(stacks))
for _, stack := range stacks {
if stack.LoadBalancerType == LoadBalancerTypeApplication {
albARNs = append(albARNs, stack.loadbalancerARN)
}
}
registeredTargets, err := a.getRegisteredTargets(tg.ARN)
if err != nil {
problems.Add("failed to get existing targets: %w", err)
}
if err := a.registerAndDeregister(albARNs, registeredTargets, tg.ARN); err != nil {
problems.Add("failed to update target registration %w", err)
}
}

// remove the IP TGs from the list keeping all other TGs including problematic #127 and nonexistent #436
targetGroupARNs := difference(allTargetGroupARNs, targetTypesARNs[elbv2.TargetTypeEnumIp])
var targetGroupARNs []string
for targetType, tgList := range targetTypesARNs {
if targetType == elbv2.TargetTypeEnumIp || targetType == elbv2.TargetTypeEnumAlb {
continue
}
for _, tg := range tgList {
targetGroupARNs = append(targetGroupARNs, tg.ARN)
}
}

ownerTags := map[string]string{
clusterIDTagPrefix + a.ClusterID(): resourceLifecycleOwned,
Expand Down Expand Up @@ -702,7 +737,7 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble
// All the required resources (listeners and target group) are created in a
// transactional fashion.
// Failure to create the stack causes it to be deleted automatically.
func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool) (string, error) {
func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool, extraListeners []ExtraListener) (string, error) {
certARNs := make(map[string]time.Time, len(certificateARNs))
for _, arn := range certificateARNs {
certARNs[arn] = time.Time{}
Expand Down Expand Up @@ -754,6 +789,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o
httpRedirectToHTTPS: a.httpRedirectToHTTPS,
nlbCrossZone: a.nlbCrossZone,
http2: http2,
extraListeners: extraListeners,
tags: a.stackTags,
internalDomains: a.internalDomains,
denyInternalDomains: a.denyInternalDomains,
Expand All @@ -767,7 +803,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o
return createStack(a.cloudformation, spec)
}

func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.Time, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool) (string, error) {
func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.Time, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool, extraListeners []ExtraListener) (string, error) {
if _, ok := SSLPolicies[sslPolicy]; !ok {
return "", fmt.Errorf("invalid SSLPolicy '%s' defined", sslPolicy)
}
Expand Down Expand Up @@ -810,6 +846,7 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.
httpRedirectToHTTPS: a.httpRedirectToHTTPS,
nlbCrossZone: a.nlbCrossZone,
http2: http2,
extraListeners: extraListeners,
tags: a.stackTags,
internalDomains: a.internalDomains,
denyInternalDomains: a.denyInternalDomains,
Expand Down Expand Up @@ -1102,36 +1139,56 @@ func nonTargetedASGs(ownedASGs, targetedASGs map[string]*autoScalingGroupDetails
return nonTargetedASGs
}

func (a *Adapter) getRegisteredTargets(tgARN string) ([]string, error) {
tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &tgARN})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

&tgARN is correct but we should use aws.String(tgARN)

if err != nil {
log.Errorf("unable to describe target health %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Errorf("unable to describe target health %v", err) -> log.Errorf("Failed to describe target health %v", err)

return []string{}, err
}
registeredTargets := make([]string, len(tgh.TargetHealthDescriptions))
for i, target := range tgh.TargetHealthDescriptions {
registeredTargets[i] = *target.Target.Id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*target.Target.Id is correct but we should use aws.StringValue(target.Target.Id) instead

}
return registeredTargets, nil
}

func (a *Adapter) registerAndDeregister(new []string, old []string, tgARN string) error {
toRegister := difference(new, old)
if len(toRegister) > 0 {
log.Info("Registering CNI targets: ", toRegister)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better use formatstring

err := registerTargetsOnTargetGroups(a.elbv2, []string{tgARN}, toRegister)
if err != nil {
return err
}
}
toDeregister := difference(old, new)
if len(toDeregister) > 0 {
log.Info("Deregistering CNI targets: ", toDeregister)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better use formatstring

err := deregisterTargetsOnTargetGroups(a.elbv2, []string{tgARN}, toDeregister)
if err != nil {
return err
}
}
return nil
}

// SetTargetsOnCNITargetGroups implements desired state for CNI target groups
// by polling the current list of targets thus creating a diff of what needs to be added and removed.
func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints, cniTargetGroupARNs []string) error {
log.Debugf("setting targets on CNI target groups: '%v'", cniTargetGroupARNs)
for _, targetGroupARN := range cniTargetGroupARNs {
tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &targetGroupARN})
func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints []CNIEndpoint, cniTargetGroups []TargetGroupWithLabels) error {
log.Debugf("setting targets on CNI target groups: '%v'", cniTargetGroups)
for _, targetGroup := range cniTargetGroups {
registeredTargets, err := a.getRegisteredTargets(targetGroup.ARN)
if err != nil {
log.Errorf("unable to describe target health %v", err)
// continue for processing of the rest of the target groups
continue
}
registeredInstances := make([]string, len(tgh.TargetHealthDescriptions))
for i, target := range tgh.TargetHealthDescriptions {
registeredInstances[i] = *target.Target.Id
}
toRegister := difference(endpoints, registeredInstances)
if len(toRegister) > 0 {
log.Info("Registering CNI targets: ", toRegister)
err := registerTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toRegister)
if err != nil {
return err
var matchingEndpoints []string
for _, endpoint := range endpoints {
if endpoint.Podlabel == targetGroup.PodLabel && endpoint.Namespace == targetGroup.PodNamespace {
matchingEndpoints = append(matchingEndpoints, endpoint.IPAddress)
}
}
toDeregister := difference(registeredInstances, endpoints)
if len(toDeregister) > 0 {
log.Info("Deregistering CNI targets: ", toDeregister)
err := deregisterTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toDeregister)
if err != nil {
return err
}
if err := a.registerAndDeregister(matchingEndpoints, registeredTargets, targetGroup.ARN); err != nil {
return err
}
}
return nil
Expand Down
95 changes: 90 additions & 5 deletions aws/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,8 +952,91 @@ func TestWithxlbHealthyThresholdCount(t *testing.T) {
})
}

func Test_getRegisteredTargets(t *testing.T) {
t.Run("should return an error if unable to describe target health", func(t *testing.T) {
m := &fake.ELBv2Client{
Outputs: fake.ELBv2Outputs{
DescribeTargetHealth: fake.R(
elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{}},
fmt.Errorf("error")),
RegisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), nil),
DeregisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), nil),
},
}
a := Adapter{elbv2: m}
_, err := a.getRegisteredTargets("none")
require.Error(t, err)
})
t.Run("should a slice of strings representing the ids of found targets", func(t *testing.T) {
thOut := elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{
{Target: &elbv2.TargetDescription{Id: aws.String("asg1")}},
{Target: &elbv2.TargetDescription{Id: aws.String("asg2")}},
{Target: &elbv2.TargetDescription{Id: aws.String("blah")}},
}}
expected := []string{"asg1", "asg2", "blah"}
m := &fake.ELBv2Client{
Outputs: fake.ELBv2Outputs{
DescribeTargetHealth: fake.R(&thOut, nil),
RegisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), nil),
DeregisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), nil),
},
}
a := Adapter{elbv2: m}
response, err := a.getRegisteredTargets("none")
require.Nil(t, err)
require.Equal(t, expected, response)
})
}

func Test_registerAndDeregister(t *testing.T) {
t.Run("should return an error if unable to register new targets", func(t *testing.T) {
m := &fake.ELBv2Client{
Outputs: fake.ELBv2Outputs{
RegisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), fmt.Errorf("this is an error")),
DeregisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), nil),
},
}
a := Adapter{elbv2: m}
err := a.registerAndDeregister([]string{"new"}, []string{"old"}, "none")
require.Error(t, err)
})
t.Run("should return an error if unable to deregister new targets", func(t *testing.T) {
m := &fake.ELBv2Client{
Outputs: fake.ELBv2Outputs{
RegisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), nil),
DeregisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), fmt.Errorf("this is an error")),
},
}
a := Adapter{elbv2: m}
err := a.registerAndDeregister([]string{"new"}, []string{"old"}, "none")
require.Error(t, err)
})
t.Run("should return nil if there's nothing to register", func(t *testing.T) {
m := &fake.ELBv2Client{
Outputs: fake.ELBv2Outputs{
RegisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), fmt.Errorf("this is an error")),
DeregisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), fmt.Errorf("this is also an error")),
},
}
a := Adapter{elbv2: m}
err := a.registerAndDeregister([]string{"same"}, []string{"same"}, "none")
require.Nil(t, err)
})
t.Run("should return nil if there's no upstream errors", func(t *testing.T) {
m := &fake.ELBv2Client{
Outputs: fake.ELBv2Outputs{
RegisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), nil),
DeregisterTargets: fake.R(fake.MockDeregisterTargetsOutput(), nil),
},
}
a := Adapter{elbv2: m}
err := a.registerAndDeregister([]string{"new"}, []string{"old"}, "none")
require.Nil(t, err)
})
}

func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
tgARNs := []string{"asg1"}
tgARNs := []TargetGroupWithLabels{{ARN: "asg1"}}
thOut := elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{}}
m := &fake.ELBv2Client{
Outputs: fake.ELBv2Outputs{
Expand All @@ -965,7 +1048,7 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
a := &Adapter{elbv2: m, TargetCNI: &TargetCNIconfig{}}

t.Run("adding a new endpoint", func(t *testing.T) {
require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups([]CNIEndpoint{{IPAddress: "1.1.1.1"}}, tgARNs))
require.Equal(t, []*elbv2.RegisterTargetsInput{{
TargetGroupArn: aws.String("asg1"),
Targets: []*elbv2.TargetDescription{{Id: aws.String("1.1.1.1")}},
Expand All @@ -979,7 +1062,8 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
}
m.Rtinputs, m.Dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups(
[]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "2.2.2.2"}, {IPAddress: "3.3.3.3"}}, tgARNs))
require.Equal(t, []*elbv2.TargetDescription{
{Id: aws.String("2.2.2.2")},
{Id: aws.String("3.3.3.3")},
Expand All @@ -995,7 +1079,7 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
}}
m.Rtinputs, m.Dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "3.3.3.3"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups([]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "3.3.3.3"}}, tgARNs))
require.Equal(t, []*elbv2.RegisterTargetsInput(nil), m.Rtinputs)
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("2.2.2.2")}}, m.Dtinputs[0].Targets)
})
Expand All @@ -1008,7 +1092,8 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) {
}}
m.Rtinputs, m.Dtinputs = nil, nil

require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, tgARNs))
require.NoError(t, a.SetTargetsOnCNITargetGroups(
[]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "2.2.2.2"}, {IPAddress: "3.3.3.3"}}, tgARNs))
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("3.3.3.3")}}, m.Rtinputs[0].Targets)
require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("4.4.4.4")}}, m.Dtinputs[0].Targets)
})
Expand Down
29 changes: 26 additions & 3 deletions aws/asg.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,39 @@ func describeTargetGroups(elbv2svc elbv2iface.ELBV2API) (map[string]struct{}, er
}

// map the target group slice into specific types such as instance, ip, etc
func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]string, error) {
targetTypes := make(map[string][]string)
func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]TargetGroupWithLabels, error) {
targetTypes := make(map[string][]TargetGroupWithLabels)
err := elbv2svc.DescribeTargetGroupsPagesWithContext(context.TODO(), &elbv2.DescribeTargetGroupsInput{},
func(resp *elbv2.DescribeTargetGroupsOutput, lastPage bool) bool {
for _, tg := range resp.TargetGroups {
for _, v := range allTGARNs {
if v != aws.StringValue(tg.TargetGroupArn) {
continue
}
targetTypes[aws.StringValue(tg.TargetType)] = append(targetTypes[aws.StringValue(tg.TargetType)], aws.StringValue(tg.TargetGroupArn))
var podlabel, podnamespace string
log.Debugf("Looking for tags on %s", aws.StringValue(tg.TargetGroupArn))
out, err := elbv2svc.DescribeTags(&elbv2.DescribeTagsInput{ResourceArns: []*string{tg.TargetGroupArn}})
if err != nil {
log.Errorf("cannot describe tags on target group: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always start uppercase for logs "Failed to describe...".

Copy link
Member

@szuecs szuecs Dec 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to continue here, because otherwise you work with empty string below the else.
Or omit setting the label with empty string.
Wdyt?

} else {
for _, desc := range out.TagDescriptions {
for _, tag := range desc.Tags {
switch aws.StringValue(tag.Key) {
case podLabelTag:
podlabel = aws.StringValue(tag.Value)
case podNamespaceTag:
podnamespace = aws.StringValue(tag.Value)
}
}
}
}
log.Debugf("Adding tg with label: '%s' in namespace: '%s'", podlabel, podnamespace)
targetTypes[aws.StringValue(tg.TargetType)] = append(
targetTypes[aws.StringValue(tg.TargetType)],
TargetGroupWithLabels{
ARN: aws.StringValue(tg.TargetGroupArn),
PodLabel: podlabel,
PodNamespace: podnamespace})
}
}
return true
Expand Down