Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing API operations to Schema Registry client #1181

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -5,3 +5,4 @@ tmp-build
.DS_Store
.idea
.go-version
.vscode/
98 changes: 61 additions & 37 deletions schemaregistry/api.html
Expand Up @@ -14,7 +14,7 @@
<script defer="" src="https://go.dev/js/jquery.js">
</script>
<script>
var goVersion = "go1.21.8";
var goVersion = "go1.21.0";
</script>
<script defer="" src="https://go.dev/js/godocs.js">
</script>
Expand Down Expand Up @@ -207,6 +207,11 @@ <h2 class="toggleButton" title="Click to hide Index section">
func (sd *SchemaMetadata) UnmarshalJSON(b []byte) error
</a>
</dd>
<dd>
<a href="#SubjectAndVersion">
type SubjectAndVersion
</a>
</dd>
</dl>
</div>
<!-- #manual-nav -->
Expand Down Expand Up @@ -258,7 +263,7 @@ <h2 id="pkg-constants">
)</pre>
<h2 id="Client">
type
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=6747:7897#L194">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=7229:8502#L206">
Client
</a>
<a class="permalink" href="#Client">
Expand All @@ -272,8 +277,10 @@ <h2 id="Client">
https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/SchemaRegistryClient.java
</a>
<pre>type Client interface {
GetAllContexts() ([]<a href="https://pkg.go.dev/builtin/#string">string</a>, <a href="https://pkg.go.dev/builtin/#error">error</a>)
Register(subject <a href="https://pkg.go.dev/builtin/#string">string</a>, schema <a href="#SchemaInfo">SchemaInfo</a>, normalize <a href="https://pkg.go.dev/builtin/#bool">bool</a>) (id <a href="https://pkg.go.dev/builtin/#int">int</a>, err <a href="https://pkg.go.dev/builtin/#error">error</a>)
GetBySubjectAndID(subject <a href="https://pkg.go.dev/builtin/#string">string</a>, id <a href="https://pkg.go.dev/builtin/#int">int</a>) (schema <a href="#SchemaInfo">SchemaInfo</a>, err <a href="https://pkg.go.dev/builtin/#error">error</a>)
GetSubjectsAndVersionsByID(id <a href="https://pkg.go.dev/builtin/#int">int</a>) (subjectAndVersion []<a href="#SubjectAndVersion">SubjectAndVersion</a>, err <a href="https://pkg.go.dev/builtin/#error">error</a>)
GetID(subject <a href="https://pkg.go.dev/builtin/#string">string</a>, schema <a href="#SchemaInfo">SchemaInfo</a>, normalize <a href="https://pkg.go.dev/builtin/#bool">bool</a>) (id <a href="https://pkg.go.dev/builtin/#int">int</a>, err <a href="https://pkg.go.dev/builtin/#error">error</a>)
GetLatestSchemaMetadata(subject <a href="https://pkg.go.dev/builtin/#string">string</a>) (<a href="#SchemaMetadata">SchemaMetadata</a>, <a href="https://pkg.go.dev/builtin/#error">error</a>)
GetSchemaMetadata(subject <a href="https://pkg.go.dev/builtin/#string">string</a>, version <a href="https://pkg.go.dev/builtin/#int">int</a>) (<a href="#SchemaMetadata">SchemaMetadata</a>, <a href="https://pkg.go.dev/builtin/#error">error</a>)
Expand All @@ -290,7 +297,7 @@ <h2 id="Client">
}</pre>
<h3 id="NewClient">
func
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=7944:7988#L213">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=8549:8593#L227">
NewClient
</a>
<a class="permalink" href="#NewClient">
Expand All @@ -302,7 +309,7 @@ <h3 id="NewClient">
NewClient returns a Client implementation
<h2 id="Compatibility">
type
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=19493:19515#L595">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=20756:20778#L625">
Compatibility
</a>
<a class="permalink" href="#Compatibility">
Expand All @@ -314,7 +321,7 @@ <h2 id="Compatibility">
<pre>type Compatibility <a href="https://pkg.go.dev/builtin/#int">int</a></pre>
<h3 id="Compatibility.MarshalJSON">
func (Compatibility)
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=20270:20322#L633">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=21533:21585#L663">
MarshalJSON
</a>
<a class="permalink" href="#Compatibility.MarshalJSON">
Expand All @@ -326,7 +333,7 @@ <h3 id="Compatibility.MarshalJSON">
MarshalJSON implements json.Marshaler
<h3 id="Compatibility.ParseString">
func (*Compatibility)
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=20743:20796#L652">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=22006:22059#L682">
ParseString
</a>
<a class="permalink" href="#Compatibility.ParseString">
Expand All @@ -338,7 +345,7 @@ <h3 id="Compatibility.ParseString">
ParseString returns a Compatibility for the given string
<h3 id="Compatibility.String">
func (Compatibility)
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=20610:20648#L647">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=21873:21911#L677">
String
</a>
<a class="permalink" href="#Compatibility.String">
Expand All @@ -348,7 +355,7 @@ <h3 id="Compatibility.String">
<pre>func (c <a href="#Compatibility">Compatibility</a>) String() <a href="https://pkg.go.dev/builtin/#string">string</a></pre>
<h3 id="Compatibility.UnmarshalJSON">
func (*Compatibility)
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=20406:20459#L638">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=21669:21722#L668">
UnmarshalJSON
</a>
<a class="permalink" href="#Compatibility.UnmarshalJSON">
Expand Down Expand Up @@ -471,7 +478,7 @@ <h3 id="NewConfigWithBearerAuthentication">
identityPoolID(`bearer.auth.identity.pool.id`) is required
<h2 id="Reference">
type
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=3043:3164#L62">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=3342:3463#L68">
Reference
</a>
<a class="permalink" href="#Reference">
Expand All @@ -488,7 +495,7 @@ <h2 id="Reference">
</pre>
<h2 id="RestError">
type
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/rest_service.go?s=2676:2770#L87">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/rest_service.go?s=2865:2959#L89">
RestError
</a>
<a class="permalink" href="#RestError">
Expand All @@ -504,7 +511,7 @@ <h2 id="RestError">
</pre>
<h3 id="RestError.Error">
func (*RestError)
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/rest_service.go?s=2819:2855#L93">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/rest_service.go?s=3008:3044#L95">
Error
</a>
<a class="permalink" href="#RestError.Error">
Expand All @@ -516,7 +523,7 @@ <h3 id="RestError.Error">
Error implements the errors.Error interface
<h2 id="SchemaInfo">
type
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=3216:3400#L69">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=3515:3699#L75">
SchemaInfo
</a>
<a class="permalink" href="#SchemaInfo">
Expand All @@ -533,7 +540,7 @@ <h2 id="SchemaInfo">
</pre>
<h3 id="SchemaInfo.MarshalJSON">
func (*SchemaInfo)
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=3457:3508#L76">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=3939:3990#L88">
MarshalJSON
</a>
<a class="permalink" href="#SchemaInfo.MarshalJSON">
Expand All @@ -545,7 +552,7 @@ <h3 id="SchemaInfo.MarshalJSON">
MarshalJSON implements the json.Marshaler interface
<h3 id="SchemaInfo.UnmarshalJSON">
func (*SchemaInfo)
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=3821:3872#L89">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=4303:4354#L101">
UnmarshalJSON
</a>
<a class="permalink" href="#SchemaInfo.UnmarshalJSON">
Expand All @@ -557,7 +564,7 @@ <h3 id="SchemaInfo.UnmarshalJSON">
UnmarshalJSON implements the json.Unmarshaller interface
<h2 id="SchemaMetadata">
type
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=4254:4420#L107">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=4736:4902#L119">
SchemaMetadata
</a>
<a class="permalink" href="#SchemaMetadata">
Expand All @@ -575,7 +582,7 @@ <h2 id="SchemaMetadata">
</pre>
<h3 id="SchemaMetadata.MarshalJSON">
func (*SchemaMetadata)
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=4477:4532#L115">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=4959:5014#L127">
MarshalJSON
</a>
<a class="permalink" href="#SchemaMetadata.MarshalJSON">
Expand All @@ -587,7 +594,7 @@ <h3 id="SchemaMetadata.MarshalJSON">
MarshalJSON implements the json.Marshaler interface
<h3 id="SchemaMetadata.UnmarshalJSON">
func (*SchemaMetadata)
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=5033:5088#L134">
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=5515:5570#L146">
UnmarshalJSON
</a>
<a class="permalink" href="#SchemaMetadata.UnmarshalJSON">
Expand All @@ -597,30 +604,47 @@ <h3 id="SchemaMetadata.UnmarshalJSON">
<pre>func (sd *<a href="#SchemaMetadata">SchemaMetadata</a>) UnmarshalJSON(b []<a href="https://pkg.go.dev/builtin/#byte">byte</a>) <a href="https://pkg.go.dev/builtin/#error">error</a></pre>
<p>
UnmarshalJSON implements the json.Unmarshaller interface
<div id="footer">
Build version go1.21.8.
<br/>
Except as
<a href="https://developers.google.com/site-policies#restrictions">
noted
<h2 id="SubjectAndVersion">
type
<a href="https://github.com/confluentinc/confluent-kafka-go/blob/v2.4.0-RC2/schemaregistry/schemaregistry_client.go?s=3763:3882#L82">
SubjectAndVersion
</a>
,
<a class="permalink" href="#SubjectAndVersion">
</a>
</h2>
<p>
SubjectAndVersion represents a pair of subject and version
<pre>type SubjectAndVersion struct {
<span id="SubjectAndVersion.Subject"></span> Subject <a href="https://pkg.go.dev/builtin/#string">string</a> `json:"subject,omitempty"`
<span id="SubjectAndVersion.Version"></span> Version <a href="https://pkg.go.dev/builtin/#int">int</a> `json:"version,omitempty"`
}
</pre>
<div id="footer">
Build version go1.21.0.
<br/>
Except as
<a href="https://developers.google.com/site-policies#restrictions">
noted
</a>
,
the content of this page is licensed under the
Creative Commons Attribution 3.0 License,
and code is licensed under a
<a href="https://go.dev/LICENSE">
BSD license
</a>
.
<br/>
<a href="https://golang.org/doc/tos.html">
Terms of Service
</a>
|
<a href="https://www.google.com/intl/en/policies/privacy/">
Privacy Policy
</a>
</div>
<a href="https://go.dev/LICENSE">
BSD license
</a>
.
<br/>
<a href="https://golang.org/doc/tos.html">
Terms of Service
</a>
|
<a href="https://www.google.com/intl/en/policies/privacy/">
Privacy Policy
</a>
</div>
</p>
</p>
</p>
</p>
Expand Down
44 changes: 44 additions & 0 deletions schemaregistry/mock_schemaregistry_client.go
Expand Up @@ -67,6 +67,12 @@ type mockclient struct {

var _ Client = new(mockclient)

// Fetch all contexts used
// Returns a string slice containing contexts
func (c *mockclient) GetAllContexts() ([]string, error) {
return []string{"."}, nil
}

// Register registers Schema aliased with subject
func (c *mockclient) Register(subject string, schema SchemaInfo, normalize bool) (id int, err error) {
schemaJSON, err := schema.MarshalJSON()
Expand Down Expand Up @@ -167,6 +173,44 @@ func (c *mockclient) GetBySubjectAndID(subject string, id int) (schema SchemaInf
return SchemaInfo{}, &posErr
}

func (c *mockclient) GetSubjectsAndVersionsByID(id int) (subjectsAndVersions []SubjectAndVersion, err error) {
subjectsAndVersions = make([]SubjectAndVersion, 0)
subjectsAndSchemas := make([]subjectJSON, 0)

c.schemaToIDCacheLock.RLock()
for key, value := range c.schemaToIDCache {
if !value.softDeleted && value.id == id {
subjectsAndSchemas = append(subjectsAndSchemas, key)
}
}
c.schemaToIDCacheLock.RUnlock()

c.schemaToVersionCacheLock.RLock()
for _, sas := range subjectsAndSchemas {
versionEntry, ok := c.schemaToVersionCache[sas]
if ok && !versionEntry.softDeleted {
subjectsAndVersions = append(subjectsAndVersions, SubjectAndVersion{
Subject: sas.subject,
Version: versionEntry.version,
})
}
}
c.schemaToVersionCacheLock.RUnlock()

if len(subjectsAndVersions) == 0 {
err = &url.Error{
Op: "GET",
URL: c.url.String() + fmt.Sprintf(subjectsAndVersionsById, id),
Err: errors.New("schema ID not found"),
}
}

sort.Slice(subjectsAndVersions, func(i, j int) bool {
return subjectsAndVersions[i].Subject < subjectsAndVersions[j].Subject
})
return
}

// GetID checks if a schema has been registered with the subject. Returns ID if the registration can be found
func (c *mockclient) GetID(subject string, schema SchemaInfo, normalize bool) (id int, err error) {
schemaJSON, err := schema.MarshalJSON()
Expand Down
34 changes: 18 additions & 16 deletions schemaregistry/rest_service.go
Expand Up @@ -38,22 +38,24 @@ import (
// Relative Confluent Schema Registry REST API endpoints as described in the Confluent documentation
// https://docs.confluent.io/current/schema-registry/docs/api.html
const (
base = ".."
schemas = "/schemas/ids/%d"
schemasBySubject = "/schemas/ids/%d?subject=%s"
subject = "/subjects"
subjects = subject + "/%s"
subjectsNormalize = subject + "/%s?normalize=%t"
subjectsDelete = subjects + "?permanent=%t"
version = subjects + "/versions"
versionNormalize = subjects + "/versions?normalize=%t"
versions = version + "/%v"
versionsDelete = versions + "?permanent=%t"
compatibility = "/compatibility" + versions
config = "/config"
subjectConfig = config + "/%s"
mode = "/mode"
modeConfig = mode + "/%s"
base = ".."
context = "/contexts"
schemas = "/schemas/ids/%d"
schemasBySubject = "/schemas/ids/%d?subject=%s"
subjectsAndVersionsById = "/schemas/ids/%d/versions"
subject = "/subjects"
subjects = subject + "/%s"
subjectsNormalize = subject + "/%s?normalize=%t"
subjectsDelete = subjects + "?permanent=%t"
version = subjects + "/versions"
versionNormalize = subjects + "/versions?normalize=%t"
versions = version + "/%v"
versionsDelete = versions + "?permanent=%t"
compatibility = "/compatibility" + versions
config = "/config"
subjectConfig = config + "/%s"
mode = "/mode"
modeConfig = mode + "/%s"

targetSRClusterKey = "Target-Sr-Cluster"
targetIdentityPoolIDKey = "Confluent-Identity-Pool-Id"
Expand Down