/
quotas.go
1092 lines (919 loc) · 29 KB
/
quotas.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package quotas
import (
"context"
"errors"
"fmt"
"path"
"strings"
"sync"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/vault/helper/metricsutil"
"github.com/hashicorp/vault/helper/namespace"
"github.com/hashicorp/vault/sdk/helper/pathmanager"
"github.com/hashicorp/vault/sdk/logical"
)
// Type represents the quota kind
type Type string
const (
// TypeRateLimit represents the rate limiting quota type
TypeRateLimit Type = "rate-limit"
// TypeLeaseCount represents the lease count limiting quota type
TypeLeaseCount Type = "lease-count"
)
// LeaseAction is the action taken by the expiration manager on the lease. The
// quota manager will use this information to update the lease path cache and
// updating counters for relevant quota rules.
type LeaseAction uint32
// String converts each lease action into its string equivalent value
func (la LeaseAction) String() string {
switch la {
case LeaseActionLoaded:
return "loaded"
case LeaseActionCreated:
return "created"
case LeaseActionDeleted:
return "deleted"
case LeaseActionAllow:
return "allow"
}
return "unknown"
}
const (
_ LeaseAction = iota
// LeaseActionLoaded indicates loading of lease in the expiration manager after
// unseal.
LeaseActionLoaded
// LeaseActionCreated indicates that a lease is created in the expiration manager.
LeaseActionCreated
// LeaseActionDeleted indicates that is lease is expired and deleted in the
// expiration manager.
LeaseActionDeleted
// LeaseActionAllow will be used to indicate the lease count checker that
// incCounter is called from Allow(). All the rest of the actions indicate the
// action took place on the lease in the expiration manager.
LeaseActionAllow
)
type leaseWalkFunc func(context.Context, func(request *Request) bool) error
// String converts each quota type into its string equivalent value
func (q Type) String() string {
switch q {
case TypeLeaseCount:
return "lease-count"
case TypeRateLimit:
return "rate-limit"
}
return "unknown"
}
const (
indexID = "id"
indexName = "name"
indexNamespace = "ns"
indexNamespaceMount = "ns_mount"
)
const (
// StoragePrefix is the prefix for the physical location where quota rules are
// persisted.
StoragePrefix = "quotas/"
// ConfigPath is the physical location where the quota configuration is
// persisted.
ConfigPath = StoragePrefix + "config"
// DefaultRateLimitExemptPathsToggle is the path to a toggle that allows us to
// determine if a Vault operator explicitly modified the exempt paths set for
// rate limit resource quotas. Specifically, when this toggle is false, we can
// infer a Vault node is operating with an initial default set and on a subsequent
// update to that set, we should not overwrite it on Setup.
DefaultRateLimitExemptPathsToggle = StoragePrefix + "default_rate_limit_exempt_paths_toggle"
)
var (
// ErrLeaseCountQuotaExceeded is returned when a request is rejected due to a lease
// count quota being exceeded.
ErrLeaseCountQuotaExceeded = errors.New("lease count quota exceeded")
// ErrRateLimitQuotaExceeded is returned when a request is rejected due to a
// rate limit quota being exceeded.
ErrRateLimitQuotaExceeded = errors.New("rate limit quota exceeded")
)
var defaultExemptPaths = []string{
"/v1/sys/generate-recovery-token/attempt",
"/v1/sys/generate-recovery-token/update",
"/v1/sys/generate-root/attempt",
"/v1/sys/generate-root/update",
"/v1/sys/health",
"/v1/sys/seal-status",
"/v1/sys/unseal",
}
// Access provides information to reach back to the quota checker.
type Access interface {
// QuotaID is the identifier of the quota that issued this access.
QuotaID() string
}
// Ensure that access implements the Access interface.
var _ Access = (*access)(nil)
// access implements the Access interface
type access struct {
quotaID string
}
// QuotaID returns the identifier of the quota rule to which this access refers
// to.
func (a *access) QuotaID() string {
return a.quotaID
}
// Manager holds all the existing quota rules. For any given input. the manager
// checks them against any applicable quota rules.
type Manager struct {
entManager
// db holds the in memory instances of all active quota rules indexed by
// some of the quota properties.
db *memdb.MemDB
// config containing operator preferences and quota behaviors
config *Config
rateLimitPathManager *pathmanager.PathManager
storage logical.Storage
ctx context.Context
logger log.Logger
metricSink *metricsutil.ClusterMetricSink
lock *sync.RWMutex
}
// Quota represents the common properties of every quota type
type Quota interface {
// allow checks the if the request is allowed by the quota type implementation.
allow(context.Context, *Request) (Response, error)
// quotaID is the identifier of the quota rule
quotaID() string
// QuotaName is the name of the quota rule
QuotaName() string
// initialize sets up the fields in the quota type to begin operating
initialize(log.Logger, *metricsutil.ClusterMetricSink) error
// close defines any cleanup behavior that needs to be executed when a quota
// rule is deleted.
close(context.Context) error
// Clone creates a clone of the calling quota
Clone() Quota
// handleRemount updates the mount and namesapce paths of the quota
handleRemount(string, string)
}
// Response holds information about the result of the Allow() call. The response
// can optionally have the Access field set, which is used to reach back into
// the quota rule that sent this response.
type Response struct {
// Allowed is set if the quota allows the request
Allowed bool
// Access is the handle to reach back into the quota rule that processed the
// quota request. This may not be set all the time.
Access Access
// Headers defines any optional headers that may be returned by the quota rule
// to clients.
Headers map[string]string
}
// Config holds operator preferences around quota behaviors
type Config struct {
// EnableRateLimitAuditLogging, if set, starts audit logging of the
// request rejections that arise due to rate limit quota violations.
EnableRateLimitAuditLogging bool `json:"enable_rate_limit_audit_logging"`
// EnableRateLimitResponseHeaders dictates if rate limit quota HTTP headers
// should be added to responses.
EnableRateLimitResponseHeaders bool `json:"enable_rate_limit_response_headers"`
// RateLimitExemptPaths defines the set of exempt paths used for all rate limit
// quotas. Any request path that exists in this set is exempt from rate limiting.
// If the set is empty, no paths are exempt.
RateLimitExemptPaths []string `json:"rate_limit_exempt_paths"`
}
// Request contains information required by the quota manager to query and
// apply the quota rules.
type Request struct {
// Type is the quota type
Type Type
// Path is the request path to which quota rules are being queried for
Path string
// NamespacePath is the namespace path to which the request belongs
NamespacePath string
// MountPath is the mount path to which the request is made
MountPath string
// ClientAddress is client unique addressable string (e.g. IP address). It can
// be empty if the quota type does not need it.
ClientAddress string
}
// NewManager creates and initializes a new quota manager to hold all the quota
// rules and to process incoming requests.
func NewManager(logger log.Logger, walkFunc leaseWalkFunc, ms *metricsutil.ClusterMetricSink) (*Manager, error) {
db, err := memdb.NewMemDB(dbSchema())
if err != nil {
return nil, err
}
manager := &Manager{
db: db,
logger: logger,
metricSink: ms,
rateLimitPathManager: pathmanager.New(),
config: new(Config),
lock: new(sync.RWMutex),
}
manager.init(walkFunc)
return manager, nil
}
// SetQuota adds or updates a quota rule.
func (m *Manager) SetQuota(ctx context.Context, qType string, quota Quota, loading bool) error {
m.lock.Lock()
defer m.lock.Unlock()
return m.setQuotaLocked(ctx, qType, quota, loading)
}
// setQuotaLocked creates a transaction, passes it into setQuotaLockedWithTxn and manages its lifecycle
// along with updating lease quota counts
func (m *Manager) setQuotaLocked(ctx context.Context, qType string, quota Quota, loading bool) error {
txn := m.db.Txn(true)
defer txn.Abort()
err := m.setQuotaLockedWithTxn(ctx, qType, quota, loading, txn)
if err != nil {
return err
}
if loading {
txn.Commit()
return nil
}
// For the lease count type, recompute the counters
if !loading && qType == TypeLeaseCount.String() {
if err := m.recomputeLeaseCounts(ctx, txn); err != nil {
return err
}
}
txn.Commit()
return nil
}
// setQuotaLockedWithTxn adds or updates a quota rule, modifying the db as well as
// any runtime elements such as goroutines, using the transaction passed in
// It should be called with the write lock held.
func (m *Manager) setQuotaLockedWithTxn(ctx context.Context, qType string, quota Quota, loading bool, txn *memdb.Txn) error {
if qType == TypeLeaseCount.String() {
m.setIsPerfStandby(quota)
}
raw, err := txn.First(qType, indexID, quota.quotaID())
if err != nil {
return err
}
// If there already exists an entry in the db, remove that first.
if raw != nil {
quota := raw.(Quota)
if err := quota.close(ctx); err != nil {
return err
}
err = txn.Delete(qType, raw)
if err != nil {
return err
}
}
// Initialize the quota type implementation
if err := quota.initialize(m.logger, m.metricSink); err != nil {
return err
}
// Add the initialized quota type implementation to the db
if err := txn.Insert(qType, quota); err != nil {
return err
}
return nil
}
// QuotaNames returns the names of all the quota rules for a given type
func (m *Manager) QuotaNames(qType Type) ([]string, error) {
m.lock.RLock()
defer m.lock.RUnlock()
return m.quotaNamesLocked(qType)
}
// quotaNamesLocked returns the names of all the quota rules for a given type, and must be called with the lock
func (m *Manager) quotaNamesLocked(qType Type) ([]string, error) {
txn := m.db.Txn(false)
iter, err := txn.Get(qType.String(), indexID)
if err != nil {
return nil, err
}
var names []string
for raw := iter.Next(); raw != nil; raw = iter.Next() {
names = append(names, raw.(Quota).QuotaName())
}
return names, nil
}
// QuotaByID queries for a quota rule in the db for a given quota ID
func (m *Manager) QuotaByID(qType string, id string) (Quota, error) {
m.lock.RLock()
defer m.lock.RUnlock()
txn := m.db.Txn(false)
quotaRaw, err := txn.First(qType, indexID, id)
if err != nil {
return nil, err
}
if quotaRaw == nil {
return nil, nil
}
return quotaRaw.(Quota), nil
}
// QuotaByName queries for a quota rule in the db for a given quota name
func (m *Manager) QuotaByName(qType string, name string) (Quota, error) {
m.lock.RLock()
defer m.lock.RUnlock()
return m.quotaByNameLocked(qType, name)
}
// quotaByNameLocked queries for a quota rule in the db for a given quota name, and must be called with the lock
func (m *Manager) quotaByNameLocked(qType string, name string) (Quota, error) {
txn := m.db.Txn(false)
quotaRaw, err := txn.First(qType, indexName, name)
if err != nil {
return nil, err
}
if quotaRaw == nil {
return nil, nil
}
return quotaRaw.(Quota), nil
}
// QuotaByFactors returns the quota rule that matches the provided factors
func (m *Manager) QuotaByFactors(ctx context.Context, qType, nsPath, mountPath string) (Quota, error) {
m.lock.RLock()
defer m.lock.RUnlock()
// nsPath would have been made non-empty during insertion. Use non-empty value
// during query as well.
if nsPath == "" {
nsPath = "root"
}
idx := indexNamespace
args := []interface{}{nsPath, false}
if mountPath != "" {
idx = indexNamespaceMount
args = []interface{}{nsPath, mountPath}
}
txn := m.db.Txn(false)
iter, err := txn.Get(qType, idx, args...)
if err != nil {
return nil, err
}
var quotas []Quota
for raw := iter.Next(); raw != nil; raw = iter.Next() {
quotas = append(quotas, raw.(Quota))
}
if len(quotas) > 1 {
m.logger.Debug("conflicting quotas in QuotaByFactors", "matching_quotas", quotas)
return nil, fmt.Errorf("conflicting quota definitions detected")
}
if len(quotas) == 0 {
return nil, nil
}
return quotas[0], nil
}
// QueryQuota returns the most specific applicable quota for a given request.
func (m *Manager) QueryQuota(req *Request) (Quota, error) {
m.lock.RLock()
defer m.lock.RUnlock()
return m.queryQuota(nil, req)
}
// queryQuota returns the quota rule that is applicable for the given request. It
// queries all the quota rules that are defined against request values and finds
// the quota rule that takes priority.
//
// Priority rules are as follows:
// - namespace specific quota takes precedence over global quota
// - mount specific quota takes precedence over namespace specific quota
func (m *Manager) queryQuota(txn *memdb.Txn, req *Request) (Quota, error) {
if txn == nil {
txn = m.db.Txn(false)
}
// ns would have been made non-empty during insertion. Use non-empty
// value during query as well.
if req.NamespacePath == "" {
req.NamespacePath = "root"
}
//
// Find a match from most specific applicable quota rule to less specific one.
//
quotaFetchFunc := func(idx string, args ...interface{}) (Quota, error) {
iter, err := txn.Get(req.Type.String(), idx, args...)
if err != nil {
return nil, err
}
var quotas []Quota
for raw := iter.Next(); raw != nil; raw = iter.Next() {
quota := raw.(Quota)
quotas = append(quotas, quota)
}
if len(quotas) > 1 {
m.logger.Debug("conflicting quotas in queryQuota", "matching_quotas", quotas, "args", args)
return nil, fmt.Errorf("conflicting quota definitions detected")
}
if len(quotas) == 0 {
return nil, nil
}
return quotas[0], nil
}
// Fetch mount quota
quota, err := quotaFetchFunc(indexNamespaceMount, req.NamespacePath, req.MountPath)
if err != nil {
return nil, err
}
if quota != nil {
return quota, nil
}
// Fetch ns quota. If NamespacePath is root, this will return the global quota.
quota, err = quotaFetchFunc(indexNamespace, req.NamespacePath, false)
if err != nil {
return nil, err
}
if quota != nil {
return quota, nil
}
// If the request belongs to "root" namespace, then we have already looked at
// global quotas when fetching namespace specific quota rule. When the request
// belongs to a non-root namespace, and when there are no namespace specific
// quota rules present, we fallback on the global quotas.
if req.NamespacePath == "root" {
return nil, nil
}
// Fetch global quota
quota, err = quotaFetchFunc(indexNamespace, "root", false)
if err != nil {
return nil, err
}
if quota != nil {
return quota, nil
}
return nil, nil
}
// DeleteQuota removes a quota rule from the db for a given name
func (m *Manager) DeleteQuota(ctx context.Context, qType string, name string) error {
m.lock.Lock()
defer m.lock.Unlock()
txn := m.db.Txn(true)
defer txn.Abort()
raw, err := txn.First(qType, indexName, name)
if err != nil {
return err
}
if raw == nil {
return nil
}
quota := raw.(Quota)
if err := quota.close(ctx); err != nil {
return err
}
err = txn.Delete(qType, raw)
if err != nil {
return err
}
// For the lease count type, recompute the counters
if qType == TypeLeaseCount.String() {
if err := m.recomputeLeaseCounts(ctx, txn); err != nil {
return err
}
}
txn.Commit()
return nil
}
// ApplyQuota runs the request against any quota rule that is applicable to it. If
// there are multiple quota rule that matches the request parameters, rule that
// takes precedence will be used to allow/reject the request.
func (m *Manager) ApplyQuota(ctx context.Context, req *Request) (Response, error) {
var resp Response
quota, err := m.QueryQuota(req)
if err != nil {
return resp, err
}
// If there is no quota defined, allow the request.
if quota == nil {
resp.Allowed = true
return resp, nil
}
// If the quota type is lease count, and if the path is not known to
// generate leases, allow the request.
if req.Type == TypeLeaseCount && !m.inLeasePathCache(req.Path) {
resp.Allowed = true
return resp, nil
}
return quota.allow(ctx, req)
}
// SetEnableRateLimitAuditLogging updates the operator preference regarding the
// audit logging behavior.
func (m *Manager) SetEnableRateLimitAuditLogging(val bool) {
m.lock.Lock()
defer m.lock.Unlock()
m.setEnableRateLimitAuditLoggingLocked(val)
}
func (m *Manager) setEnableRateLimitAuditLoggingLocked(val bool) {
m.config.EnableRateLimitAuditLogging = val
}
// SetEnableRateLimitResponseHeaders updates the operator preference regarding
// the rate limit quota HTTP header behavior.
func (m *Manager) SetEnableRateLimitResponseHeaders(val bool) {
m.lock.Lock()
defer m.lock.Unlock()
m.setEnableRateLimitResponseHeadersLocked(val)
}
func (m *Manager) setEnableRateLimitResponseHeadersLocked(val bool) {
m.config.EnableRateLimitResponseHeaders = val
}
// SetRateLimitExemptPaths updates the rate limit exempt paths in the Manager's
// configuration in addition to updating the path manager. Every call to
// SetRateLimitExemptPaths will wipe out the existing path manager and set the
// paths based on the provided argument.
func (m *Manager) SetRateLimitExemptPaths(vals []string) {
m.lock.Lock()
defer m.lock.Unlock()
m.setRateLimitExemptPathsLocked(vals)
}
func (m *Manager) setRateLimitExemptPathsLocked(vals []string) {
if vals == nil {
vals = []string{}
}
m.config.RateLimitExemptPaths = vals
m.rateLimitPathManager = pathmanager.New()
m.rateLimitPathManager.AddPaths(vals)
}
// RateLimitAuditLoggingEnabled returns if the quota configuration allows audit
// logging of request rejections due to rate limiting quota rule violations.
func (m *Manager) RateLimitAuditLoggingEnabled() bool {
m.lock.RLock()
defer m.lock.RUnlock()
return m.config.EnableRateLimitAuditLogging
}
// RateLimitResponseHeadersEnabled returns if the quota configuration allows for
// rate limit quota HTTP headers to be added to responses.
func (m *Manager) RateLimitResponseHeadersEnabled() bool {
m.lock.RLock()
defer m.lock.RUnlock()
return m.config.EnableRateLimitResponseHeaders
}
// RateLimitExemptPaths returns the list of exempt paths from all rate limit
// resource quotas from the Manager's configuration.
func (m *Manager) RateLimitExemptPaths() []string {
m.lock.RLock()
defer m.lock.RUnlock()
return m.config.RateLimitExemptPaths
}
// RateLimitPathExempt returns a boolean dictating if a given path is exempt from
// any rate limit quota. If not rate limit path manager is defined, false is
// returned.
func (m *Manager) RateLimitPathExempt(path string) bool {
m.lock.RLock()
defer m.lock.RUnlock()
if m.rateLimitPathManager == nil {
return false
}
return m.rateLimitPathManager.HasPath(path)
}
// Config returns the operator preferences in the quota manager
func (m *Manager) Config() *Config {
return m.config
}
// Reset will clear all the quotas from the db and clear the lease path cache.
func (m *Manager) Reset() error {
m.lock.Lock()
defer m.lock.Unlock()
err := m.resetCache()
if err != nil {
return err
}
m.storage = nil
m.ctx = nil
return m.entManager.Reset()
}
// Must be called with the lock held
func (m *Manager) resetCache() error {
names, err := m.quotaNamesLocked(TypeRateLimit)
if err != nil {
return err
}
for _, name := range names {
quota, err := m.quotaByNameLocked(TypeRateLimit.String(), name)
if err != nil {
return err
}
if quota != nil {
rlq := quota.(*RateLimitQuota)
err = rlq.store.Close(context.Background())
if err != nil {
return err
}
}
}
db, err := memdb.NewMemDB(dbSchema())
if err != nil {
return err
}
m.db = db
return nil
}
// dbSchema creates a DB schema for holding all the quota rules. It creates
// table for each supported type of quota.
func dbSchema() *memdb.DBSchema {
schema := &memdb.DBSchema{
Tables: make(map[string]*memdb.TableSchema),
}
commonSchema := func(name string) *memdb.TableSchema {
return &memdb.TableSchema{
Name: name,
Indexes: map[string]*memdb.IndexSchema{
indexID: {
Name: indexID,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
},
},
indexName: {
Name: indexName,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Name",
},
},
indexNamespace: {
Name: indexNamespace,
Indexer: &memdb.CompoundMultiIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "NamespacePath",
},
// By sending false as the query parameter, we can
// query just the namespace specific quota.
&memdb.FieldSetIndex{
Field: "MountPath",
},
},
},
},
indexNamespaceMount: {
Name: indexNamespaceMount,
AllowMissing: true,
Indexer: &memdb.CompoundMultiIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "NamespacePath",
},
&memdb.StringFieldIndex{
Field: "MountPath",
},
},
},
},
},
}
}
// Create a table per quota type. This allows names to be reused between
// different quota types and querying a bit easier.
for _, name := range quotaTypes() {
schema.Tables[name] = commonSchema(name)
}
return schema
}
// Invalidate receives notifications from the replication sub-system when a key
// is updated in the storage. This function will read the key from storage and
// updates the caches and data structures to reflect those updates.
func (m *Manager) Invalidate(key string) {
switch key {
case "config":
config, err := LoadConfig(m.ctx, m.storage)
if err != nil {
m.logger.Error("failed to invalidate quota config", "error", err)
return
}
m.SetEnableRateLimitAuditLogging(config.EnableRateLimitAuditLogging)
m.SetEnableRateLimitResponseHeaders(config.EnableRateLimitResponseHeaders)
m.SetRateLimitExemptPaths(config.RateLimitExemptPaths)
default:
splitKeys := strings.Split(key, "/")
if len(splitKeys) != 2 {
m.logger.Error("incorrect key while invalidating quota rule", "key", key)
return
}
qType := splitKeys[0]
name := splitKeys[1]
if qType == TypeLeaseCount.String() && m.isDRSecondary {
// lease count invalidation not supported on DR Secondary
return
}
// Read quota rule from storage
quota, err := Load(m.ctx, m.storage, qType, name)
if err != nil {
m.logger.Error("failed to read invalidated quota rule", "error", err)
return
}
switch {
case quota == nil:
// Handle quota deletion
if err := m.DeleteQuota(m.ctx, qType, name); err != nil {
m.logger.Error("failed to delete invalidated quota rule", "error", err)
return
}
default:
// Handle quota update
if err := m.SetQuota(m.ctx, qType, quota, false); err != nil {
m.logger.Error("failed to update invalidated quota rule", "error", err)
return
}
}
}
}
// LoadConfig reads the quota configuration from the underlying storage
func LoadConfig(ctx context.Context, storage logical.Storage) (*Config, error) {
var config Config
entry, err := storage.Get(ctx, ConfigPath)
if err != nil {
return nil, err
}
if entry == nil {
return &config, nil
}
err = entry.DecodeJSON(&config)
if err != nil {
return nil, err
}
return &config, nil
}
// Load reads the quota rule from the underlying storage
func Load(ctx context.Context, storage logical.Storage, qType, name string) (Quota, error) {
var quota Quota
entry, err := storage.Get(ctx, QuotaStoragePath(qType, name))
if err != nil {
return nil, err
}
if entry == nil {
return nil, nil
}
switch qType {
case TypeRateLimit.String():
quota = &RateLimitQuota{}
case TypeLeaseCount.String():
quota = &LeaseCountQuota{}
default:
return nil, fmt.Errorf("unsupported type: %v", qType)
}
err = entry.DecodeJSON(quota)
if err != nil {
return nil, err
}
return quota, nil
}
// Setup loads the quota configuration and all the quota rules into the
// quota manager.
func (m *Manager) Setup(ctx context.Context, storage logical.Storage, isPerfStandby, isDRSecondary bool) error {
m.lock.Lock()
defer m.lock.Unlock()
m.storage = storage
m.ctx = ctx
m.isPerfStandby = isPerfStandby
m.isDRSecondary = isDRSecondary
// Load the quota configuration from storage and load it into the quota
// manager.
config, err := LoadConfig(ctx, storage)
if err != nil {
return err
}
entry, err := storage.Get(ctx, DefaultRateLimitExemptPathsToggle)
if err != nil {
return err
}
// Determine if we need to set the default set of exempt paths for rate limit
// resource quotas. We use a default set introduced in 1.5 when the toggle
// entry does not exist in storage or is false. The toggle is flipped , i.e.
// set to true when SetRateLimitExemptPaths is called during a config update.
var toggle bool
if entry != nil {
if err := entry.DecodeJSON(&toggle); err != nil {
return err
}
}
exemptPaths := defaultExemptPaths
if toggle {
exemptPaths = config.RateLimitExemptPaths
}
m.setEnableRateLimitAuditLoggingLocked(config.EnableRateLimitAuditLogging)
m.setEnableRateLimitResponseHeadersLocked(config.EnableRateLimitResponseHeaders)
m.setRateLimitExemptPathsLocked(exemptPaths)
if err = m.resetCache(); err != nil {
return err
}
for _, qType := range quotaTypes() {
m.setupQuotaType(ctx, storage, qType)
}
return nil
}
func (m *Manager) setupQuotaType(ctx context.Context, storage logical.Storage, quotaType string) error {
if quotaType == TypeLeaseCount.String() && m.isDRSecondary {
m.logger.Trace("lease count quotas are not processed on DR Secondaries")
return nil
}
names, err := logical.CollectKeys(ctx, logical.NewStorageView(storage, StoragePrefix+quotaType+"/"))
if err != nil {
return err
}
for _, name := range names {
quota, err := Load(ctx, m.storage, quotaType, name)
if err != nil {
return err
}
if quota == nil {
continue
}
err = m.setQuotaLocked(ctx, quotaType, quota, true)
if err != nil {
return err
}
}
return nil
}
// QuotaStoragePath returns the storage path suffix for persisting the quota
// rule.
func QuotaStoragePath(quotaType, name string) string {
return path.Join(StoragePrefix+quotaType, name)
}
// HandleRemount updates the quota subsystem about the remount operation that
// took place. Quota manager will trigger the quota specific updates including
// the mount path update and the namespace update
func (m *Manager) HandleRemount(ctx context.Context, from, to namespace.MountPathDetails) error {
m.lock.Lock()
defer m.lock.Unlock()
// Grab a write transaction, as we want to save the updated quota in memdb
txn := m.db.Txn(true)
defer txn.Abort()
// quota namespace would have been made non-empty during insertion. Use non-empty value
// during query as well.
fromNs := from.Namespace.Path
if fromNs == "" {
fromNs = namespace.RootNamespaceID
}
toNs := to.Namespace.Path
if toNs == "" {
toNs = namespace.RootNamespaceID