Skip to content

Commit

Permalink
[Improve] Supplement schema admin api (#1215)
Browse files Browse the repository at this point in the history
### Motivation
To keep consistent with the Java client.

### Modifications
- CreateSchemaBySchemaInfo
- GetVersionBySchemaInfo
- GetVersionByPayload
- TestCompatibilityWithSchemaInfo
- TestCompatibilityWithPostSchemaPayload
  • Loading branch information
crossoverJie committed May 10, 2024
1 parent 007d14e commit 49fce72
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 0 deletions.
59 changes: 59 additions & 0 deletions pulsaradmin/pkg/admin/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ type Schema interface {

// CreateSchemaByPayload creates a schema for a given <tt>topic</tt>
CreateSchemaByPayload(topic string, schemaPayload utils.PostSchemaPayload) error

// CreateSchemaBySchemaInfo creates a schema for a given <tt>topic</tt>
CreateSchemaBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) error

// GetVersionBySchemaInfo gets the version of a schema
GetVersionBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) (int64, error)

// GetVersionByPayload gets the version of a schema
GetVersionByPayload(topic string, schemaPayload utils.PostSchemaPayload) (int64, error)

// TestCompatibilityWithSchemaInfo tests compatibility with a schema
TestCompatibilityWithSchemaInfo(topic string, schemaInfo utils.SchemaInfo) (*utils.IsCompatibility, error)

// TestCompatibilityWithPostSchemaPayload tests compatibility with a schema
TestCompatibilityWithPostSchemaPayload(topic string,
schemaPayload utils.PostSchemaPayload) (*utils.IsCompatibility, error)
}

type schemas struct {
Expand Down Expand Up @@ -148,3 +164,46 @@ func (s *schemas) CreateSchemaByPayload(topic string, schemaPayload utils.PostSc

return s.pulsar.Client.Post(endpoint, &schemaPayload)
}

func (s *schemas) CreateSchemaBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) error {
schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo)
return s.CreateSchemaByPayload(topic, schemaPayload)
}

func (s *schemas) GetVersionBySchemaInfo(topic string, schemaInfo utils.SchemaInfo) (int64, error) {
schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo)
return s.GetVersionByPayload(topic, schemaPayload)
}

func (s *schemas) GetVersionByPayload(topic string, schemaPayload utils.PostSchemaPayload) (int64, error) {
topicName, err := utils.GetTopicName(topic)
if err != nil {
return 0, err
}
version := struct {
Version int64 `json:"version"`
}{}
endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(),
topicName.GetLocalName(), "version")
err = s.pulsar.Client.PostWithObj(endpoint, &schemaPayload, &version)
return version.Version, err
}

func (s *schemas) TestCompatibilityWithSchemaInfo(topic string,
schemaInfo utils.SchemaInfo) (*utils.IsCompatibility, error) {
schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo)
return s.TestCompatibilityWithPostSchemaPayload(topic, schemaPayload)
}

func (s *schemas) TestCompatibilityWithPostSchemaPayload(topic string,
schemaPayload utils.PostSchemaPayload) (*utils.IsCompatibility, error) {
topicName, err := utils.GetTopicName(topic)
if err != nil {
return nil, err
}
var isCompatibility utils.IsCompatibility
endpoint := s.pulsar.endpoint(s.basePath, topicName.GetTenant(), topicName.GetNamespace(),
topicName.GetLocalName(), "compatibility")
err = s.pulsar.Client.PostWithObj(endpoint, &schemaPayload, &isCompatibility)
return &isCompatibility, err
}
45 changes: 45 additions & 0 deletions pulsaradmin/pkg/admin/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,48 @@ func TestSchemas_ForceDeleteSchema(t *testing.T) {
assert.Errorf(t, err, "Schema not found")

}

func TestSchemas_CreateSchemaBySchemaInfo(t *testing.T) {
cfg := &config.Config{}
admin, err := New(cfg)
assert.NoError(t, err)
assert.NotNil(t, admin)

schemaInfo := utils.SchemaInfo{
Schema: []byte(""),
Type: "STRING",
}
topic := fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
err = admin.Schemas().CreateSchemaBySchemaInfo(topic, schemaInfo)
assert.NoError(t, err)

info, err := admin.Schemas().GetSchemaInfo(topic)
assert.NoError(t, err)
assert.Equal(t, schemaInfo.Type, info.Type)

version, err := admin.Schemas().GetVersionBySchemaInfo(topic, schemaInfo)
assert.NoError(t, err)
assert.Equal(t, version, int64(0))

schemaPayload := utils.ConvertSchemaInfoToPostSchemaPayload(schemaInfo)
version, err = admin.Schemas().GetVersionByPayload(topic, schemaPayload)
assert.NoError(t, err)
assert.Equal(t, version, int64(0))

compatibility, err := admin.Schemas().TestCompatibilityWithSchemaInfo(topic, schemaInfo)
assert.NoError(t, err)
assert.Equal(t, compatibility.IsCompatibility, true)
assert.Equal(t, compatibility.SchemaCompatibilityStrategy, utils.SchemaCompatibilityStrategy("FULL"))

compatibility, err = admin.Schemas().TestCompatibilityWithPostSchemaPayload(topic, schemaPayload)
assert.NoError(t, err)
assert.Equal(t, compatibility.IsCompatibility, true)
assert.Equal(t, compatibility.SchemaCompatibilityStrategy, utils.SchemaCompatibilityStrategy("FULL"))

err = admin.Schemas().ForceDeleteSchema(topic)
assert.NoError(t, err)

_, err = admin.Schemas().GetSchemaInfo(topic)
assert.Errorf(t, err, "Schema not found")

}
23 changes: 23 additions & 0 deletions pulsaradmin/pkg/utils/schema_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ type GetSchemaResponse struct {
Properties map[string]string `json:"properties"`
}

type IsCompatibility struct {
IsCompatibility bool `json:"compatibility"`
SchemaCompatibilityStrategy SchemaCompatibilityStrategy `json:"schemaCompatibilityStrategy"`
}

func ConvertGetSchemaResponseToSchemaInfo(tn *TopicName, response GetSchemaResponse) *SchemaInfo {
info := new(SchemaInfo)
schema := make([]byte, 0, 10)
Expand All @@ -61,6 +66,24 @@ func ConvertGetSchemaResponseToSchemaInfo(tn *TopicName, response GetSchemaRespo
return info
}

func ConvertSchemaDataToStringLegacy(schemaInfo SchemaInfo) string {
schema := schemaInfo.Schema
if schema == nil {
return ""
}
// TODO: KEY_VALUE
return string(schema)

}

func ConvertSchemaInfoToPostSchemaPayload(schemaInfo SchemaInfo) PostSchemaPayload {
return PostSchemaPayload{
SchemaType: schemaInfo.Type,
Schema: ConvertSchemaDataToStringLegacy(schemaInfo),
Properties: schemaInfo.Properties,
}
}

func ConvertGetSchemaResponseToSchemaInfoWithVersion(tn *TopicName, response GetSchemaResponse) *SchemaInfoWithVersion {
info := new(SchemaInfoWithVersion)
info.SchemaInfo = ConvertGetSchemaResponseToSchemaInfo(tn, response)
Expand Down

0 comments on commit 49fce72

Please sign in to comment.