Skip to content

Commit

Permalink
Plugins: Introduce Plugin Registry (grafana#47200)
Browse files Browse the repository at this point in the history
* introduce registry write/read separation

* internal + external registries

* fix tests

* fixup

* rename

* move interfaces

* back to plugins.Store

* fix registry name

* remove context.TODOs

* remove some ctx for now

* tidy

* remove dupe logic

* update naming

* move from manager.go to store

* amend logger name

* new store writer svc

* restrict changes

* more simplifying

* move interfaces around

* remove unused

* fix linter

* tidy

* add registry test

* fix tests

* revert testdata changes

* revert testdata changes #1

* revert testdata changes #2

* revert testdata changes #3

* revert testdata changes #4

* revert testdata changes #5

* revert testdata changes

* fixup testdata

* remove unused log

* update naming in test

* adjust ctx in test
  • Loading branch information
wbrowne committed Jun 3, 2022
1 parent 6c7b6a7 commit 7536647
Show file tree
Hide file tree
Showing 15 changed files with 547 additions and 158 deletions.
2 changes: 1 addition & 1 deletion pkg/plugins/backendplugin/errors.go
Expand Up @@ -3,7 +3,7 @@ package backendplugin
import "errors"

var (
// ErrPluginNotRegistered error returned when plugin not registered.
// ErrPluginNotRegistered error returned when plugin is not registered.
ErrPluginNotRegistered = errors.New("plugin not registered")
// ErrHealthCheckFailed error returned when health check failed.
ErrHealthCheckFailed = errors.New("health check failed")
Expand Down
16 changes: 0 additions & 16 deletions pkg/plugins/ifaces.go
Expand Up @@ -21,22 +21,6 @@ type Store interface {
Remove(ctx context.Context, pluginID string) error
}

// Loader is responsible for loading plugins from the file system.
type Loader interface {
// Load will return a list of plugins found in the provided file system paths.
Load(ctx context.Context, class Class, paths []string, ignore map[string]struct{}) ([]*Plugin, error)
}

// Installer is responsible for managing plugins (add / remove) on the file system.
type Installer interface {
// Install downloads the requested plugin in the provided file system location.
Install(ctx context.Context, pluginID, version, pluginsDir, pluginZipURL, pluginRepoURL string) error
// Uninstall removes the requested plugin from the provided file system location.
Uninstall(ctx context.Context, pluginDir string) error
// GetUpdateInfo provides update information for the requested plugin.
GetUpdateInfo(ctx context.Context, pluginID, version, pluginRepoURL string) (UpdateInfo, error)
}

type UpdateInfo struct {
PluginZipURL string
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/plugins/manager/client.go
Expand Up @@ -12,7 +12,7 @@ import (
)

func (m *PluginManager) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
plugin, exists := m.plugin(req.PluginContext.PluginID)
plugin, exists := m.plugin(ctx, req.PluginContext.PluginID)
if !exists {
return nil, backendplugin.ErrPluginNotRegistered
}
Expand Down Expand Up @@ -48,11 +48,10 @@ func (m *PluginManager) QueryData(ctx context.Context, req *backend.QueryDataReq
}

func (m *PluginManager) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
p, exists := m.plugin(req.PluginContext.PluginID)
p, exists := m.plugin(ctx, req.PluginContext.PluginID)
if !exists {
return backendplugin.ErrPluginNotRegistered
}

err := instrumentation.InstrumentCallResourceRequest(p.PluginID(), func() error {
if err := p.CallResource(ctx, req, sender); err != nil {
return err
Expand All @@ -68,7 +67,7 @@ func (m *PluginManager) CallResource(ctx context.Context, req *backend.CallResou
}

func (m *PluginManager) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) {
p, exists := m.plugin(req.PluginContext.PluginID)
p, exists := m.plugin(ctx, req.PluginContext.PluginID)
if !exists {
return nil, backendplugin.ErrPluginNotRegistered
}
Expand All @@ -86,7 +85,7 @@ func (m *PluginManager) CollectMetrics(ctx context.Context, req *backend.Collect
}

func (m *PluginManager) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
p, exists := m.plugin(req.PluginContext.PluginID)
p, exists := m.plugin(ctx, req.PluginContext.PluginID)
if !exists {
return nil, backendplugin.ErrPluginNotRegistered
}
Expand All @@ -113,7 +112,7 @@ func (m *PluginManager) CheckHealth(ctx context.Context, req *backend.CheckHealt
}

func (m *PluginManager) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
plugin, exists := m.plugin(req.PluginContext.PluginID)
plugin, exists := m.plugin(ctx, req.PluginContext.PluginID)
if !exists {
return nil, backendplugin.ErrPluginNotRegistered
}
Expand All @@ -122,7 +121,7 @@ func (m *PluginManager) SubscribeStream(ctx context.Context, req *backend.Subscr
}

func (m *PluginManager) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) {
plugin, exists := m.plugin(req.PluginContext.PluginID)
plugin, exists := m.plugin(ctx, req.PluginContext.PluginID)
if !exists {
return nil, backendplugin.ErrPluginNotRegistered
}
Expand All @@ -131,7 +130,7 @@ func (m *PluginManager) PublishStream(ctx context.Context, req *backend.PublishS
}

func (m *PluginManager) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error {
plugin, exists := m.plugin(req.PluginContext.PluginID)
plugin, exists := m.plugin(ctx, req.PluginContext.PluginID)
if !exists {
return backendplugin.ErrPluginNotRegistered
}
Expand Down
44 changes: 23 additions & 21 deletions pkg/plugins/manager/dashboard_file_store_test.go
Expand Up @@ -190,30 +190,32 @@ func setupPluginDashboardsForTest(t *testing.T) *PluginManager {
t.Helper()

return &PluginManager{
store: map[string]*plugins.Plugin{
"pluginWithoutDashboards": {
JSONData: plugins.JSONData{
Includes: []*plugins.Includes{
{
Type: "page",
pluginRegistry: &fakePluginRegistry{
store: map[string]*plugins.Plugin{
"pluginWithoutDashboards": {
JSONData: plugins.JSONData{
Includes: []*plugins.Includes{
{
Type: "page",
},
},
},
},
},
"pluginWithDashboards": {
PluginDir: "plugins/plugin-id",
JSONData: plugins.JSONData{
Includes: []*plugins.Includes{
{
Type: "page",
},
{
Type: "dashboard",
Path: "dashboards/dash1.json",
},
{
Type: "dashboard",
Path: "dashboards/dash2.json",
"pluginWithDashboards": {
PluginDir: "plugins/plugin-id",
JSONData: plugins.JSONData{
Includes: []*plugins.Includes{
{
Type: "page",
},
{
Type: "dashboard",
Path: "dashboards/dash1.json",
},
{
Type: "dashboard",
Path: "dashboards/dash2.json",
},
},
},
},
Expand Down
16 changes: 16 additions & 0 deletions pkg/plugins/manager/installer/ifaces.go
@@ -1,5 +1,21 @@
package installer

import (
"context"

"github.com/grafana/grafana/pkg/plugins"
)

// Service is responsible for managing plugins (add / remove) on the file system.
type Service interface {
// Install downloads the requested plugin in the provided file system location.
Install(ctx context.Context, pluginID, version, pluginsDir, pluginZipURL, pluginRepoURL string) error
// Uninstall removes the requested plugin from the provided file system location.
Uninstall(ctx context.Context, pluginDir string) error
// GetUpdateInfo provides update information for the requested plugin.
GetUpdateInfo(ctx context.Context, pluginID, version, pluginRepoURL string) (plugins.UpdateInfo, error)
}

type Logger interface {
Successf(format string, args ...interface{})
Failuref(format string, args ...interface{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/manager/installer/installer.go
Expand Up @@ -80,7 +80,7 @@ func (e ErrVersionNotFound) Error() string {
return fmt.Sprintf("%s v%s either does not exist or is not supported on your system (%s)", e.PluginID, e.RequestedVersion, e.SystemInfo)
}

func New(skipTLSVerify bool, grafanaVersion string, logger Logger) plugins.Installer {
func New(skipTLSVerify bool, grafanaVersion string, logger Logger) Service {
return &Installer{
httpClient: makeHttpClient(skipTLSVerify, 10*time.Second),
httpClientNoTimeout: makeHttpClient(skipTLSVerify, 0),
Expand Down
13 changes: 13 additions & 0 deletions pkg/plugins/manager/loader/ifaces.go
@@ -0,0 +1,13 @@
package loader

import (
"context"

"github.com/grafana/grafana/pkg/plugins"
)

// Service is responsible for loading plugins from the file system.
type Service interface {
// Load will return a list of plugins found in the provided file system paths.
Load(ctx context.Context, class plugins.Class, paths []string, ignore map[string]struct{}) ([]*plugins.Plugin, error)
}
54 changes: 20 additions & 34 deletions pkg/plugins/manager/manager.go
Expand Up @@ -3,7 +3,6 @@ package manager
import (
"context"
"errors"
"fmt"
"path/filepath"
"sync"
"time"
Expand All @@ -12,6 +11,8 @@ import (
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/plugins/manager/installer"
"github.com/grafana/grafana/pkg/plugins/manager/loader"
"github.com/grafana/grafana/pkg/plugins/manager/registry"
"github.com/grafana/grafana/pkg/setting"
)

Expand All @@ -26,9 +27,9 @@ var _ plugins.RendererManager = (*PluginManager)(nil)

type PluginManager struct {
cfg *plugins.Cfg
store map[string]*plugins.Plugin
pluginInstaller plugins.Installer
pluginLoader plugins.Loader
pluginRegistry registry.Service
pluginInstaller installer.Service
pluginLoader loader.Service
pluginsMu sync.RWMutex
pluginSources []PluginSource
log log.Logger
Expand All @@ -39,8 +40,8 @@ type PluginSource struct {
Paths []string
}

func ProvideService(grafanaCfg *setting.Cfg, pluginLoader plugins.Loader) (*PluginManager, error) {
pm := New(plugins.FromGrafanaCfg(grafanaCfg), []PluginSource{
func ProvideService(grafanaCfg *setting.Cfg, pluginRegistry registry.Service, pluginLoader loader.Service) (*PluginManager, error) {
pm := New(plugins.FromGrafanaCfg(grafanaCfg), pluginRegistry, []PluginSource{
{Class: plugins.Core, Paths: corePluginPaths(grafanaCfg)},
{Class: plugins.Bundled, Paths: []string{grafanaCfg.BundledPluginsPath}},
{Class: plugins.External, Paths: append([]string{grafanaCfg.PluginsPath}, pluginSettingPaths(grafanaCfg)...)},
Expand All @@ -51,12 +52,12 @@ func ProvideService(grafanaCfg *setting.Cfg, pluginLoader plugins.Loader) (*Plug
return pm, nil
}

func New(cfg *plugins.Cfg, pluginSources []PluginSource, pluginLoader plugins.Loader) *PluginManager {
func New(cfg *plugins.Cfg, pluginRegistry registry.Service, pluginSources []PluginSource, pluginLoader loader.Service) *PluginManager {
return &PluginManager{
cfg: cfg,
pluginLoader: pluginLoader,
pluginSources: pluginSources,
store: make(map[string]*plugins.Plugin),
pluginRegistry: pluginRegistry,
log: log.New("plugin.manager"),
pluginInstaller: installer.New(false, cfg.BuildVersion, newInstallerLogger("plugin.installer", true)),
}
Expand Down Expand Up @@ -91,7 +92,7 @@ func (m *PluginManager) loadPlugins(ctx context.Context, class plugins.Class, pa
}
}

loadedPlugins, err := m.pluginLoader.Load(ctx, class, pluginPaths, m.registeredPlugins())
loadedPlugins, err := m.pluginLoader.Load(ctx, class, pluginPaths, m.registeredPlugins(ctx))
if err != nil {
m.log.Error("Could not load plugins", "paths", pluginPaths, "err", err)
return err
Expand All @@ -107,7 +108,7 @@ func (m *PluginManager) loadPlugins(ctx context.Context, class plugins.Class, pa
}

func (m *PluginManager) Renderer() *plugins.Plugin {
for _, p := range m.plugins() {
for _, p := range m.availablePlugins(context.TODO()) {
if p.IsRenderer() {
return p
}
Expand All @@ -119,41 +120,24 @@ func (m *PluginManager) Renderer() *plugins.Plugin {
func (m *PluginManager) Routes() []*plugins.StaticRoute {
staticRoutes := make([]*plugins.StaticRoute, 0)

for _, p := range m.plugins() {
for _, p := range m.availablePlugins(context.TODO()) {
if p.StaticRoute() != nil {
staticRoutes = append(staticRoutes, p.StaticRoute())
}
}
return staticRoutes
}

func (m *PluginManager) registerAndStart(ctx context.Context, plugin *plugins.Plugin) error {
err := m.register(plugin)
if err != nil {
func (m *PluginManager) registerAndStart(ctx context.Context, p *plugins.Plugin) error {
if err := m.pluginRegistry.Add(ctx, p); err != nil {
return err
}

if !m.isRegistered(plugin.ID) {
return fmt.Errorf("plugin %s is not registered", plugin.ID)
}

return m.start(ctx, plugin)
}

func (m *PluginManager) register(p *plugins.Plugin) error {
if m.isRegistered(p.ID) {
return fmt.Errorf("plugin %s is already registered", p.ID)
}

m.pluginsMu.Lock()
m.store[p.ID] = p
m.pluginsMu.Unlock()

if !p.IsCorePlugin() {
m.log.Info("Plugin registered", "pluginId", p.ID)
}

return nil
return m.start(ctx, p)
}

func (m *PluginManager) unregisterAndStop(ctx context.Context, p *plugins.Plugin) error {
Expand All @@ -169,7 +153,9 @@ func (m *PluginManager) unregisterAndStop(ctx context.Context, p *plugins.Plugin
return err
}

delete(m.store, p.ID)
if err := m.pluginRegistry.Remove(ctx, p.ID); err != nil {
return err
}

m.log.Debug("Plugin unregistered", "pluginId", p.ID)
return nil
Expand All @@ -181,7 +167,7 @@ func (m *PluginManager) start(ctx context.Context, p *plugins.Plugin) error {
return nil
}

if !m.isRegistered(p.ID) {
if _, exists := m.pluginRegistry.Plugin(ctx, p.ID); !exists {
return backendplugin.ErrPluginNotRegistered
}

Expand Down Expand Up @@ -245,7 +231,7 @@ func restartKilledProcess(ctx context.Context, p *plugins.Plugin) error {
// shutdown stops all backend plugin processes
func (m *PluginManager) shutdown(ctx context.Context) {
var wg sync.WaitGroup
for _, p := range m.plugins() {
for _, p := range m.availablePlugins(ctx) {
wg.Add(1)
go func(p backendplugin.Plugin, ctx context.Context) {
defer wg.Done()
Expand Down

0 comments on commit 7536647

Please sign in to comment.