diff --git a/build-parent/pom.xml b/build-parent/pom.xml index e2d6e9c1b05eb..2a9d7f36089c1 100644 --- a/build-parent/pom.xml +++ b/build-parent/pom.xml @@ -134,6 +134,8 @@ 4.6.1 + 0.9.15 + 1.9.1 6.1.2 3.6.1 5.62.2 @@ -309,6 +311,16 @@ bootstrap ${webjar.bootstrap.version} + + org.webjars + bootstrap-multiselect + ${webjar.bootstrap-multiselect.version} + + + org.webjars.npm + bootstrap-icons + ${webjar.bootstrap-icons.version} + org.webjars font-awesome diff --git a/extensions/kafka-client/deployment/pom.xml b/extensions/kafka-client/deployment/pom.xml index c1312bd48e115..a7da8dfa0277f 100644 --- a/extensions/kafka-client/deployment/pom.xml +++ b/extensions/kafka-client/deployment/pom.xml @@ -44,6 +44,10 @@ io.quarkus quarkus-caffeine-deployment + + io.quarkus + quarkus-vertx-http-dev-console-spi + org.testcontainers testcontainers diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java index b975b460e869b..420d8bd85a453 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaBuildTimeConfig.java @@ -28,4 +28,5 @@ public class KafkaBuildTimeConfig { */ @ConfigItem public KafkaDevServicesBuildTimeConfig devservices; + } diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java index 41fb178c3ea4a..c29791d421569 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/KafkaProcessor.java @@ -60,6 +60,7 @@ import io.quarkus.deployment.Capabilities; import io.quarkus.deployment.Capability; import io.quarkus.deployment.Feature; +import io.quarkus.deployment.IsDevelopment; import io.quarkus.deployment.IsNormal; import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; @@ -71,6 +72,7 @@ import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.IndexDependencyBuildItem; +import io.quarkus.deployment.builditem.LaunchModeBuildItem; import io.quarkus.deployment.builditem.LogCategoryBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; import io.quarkus.deployment.builditem.RuntimeConfigSetupCompleteBuildItem; @@ -82,9 +84,14 @@ import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem; import io.quarkus.deployment.logging.LogCleanupFilterBuildItem; import io.quarkus.deployment.pkg.NativeConfig; -import io.quarkus.kafka.client.runtime.KafkaBindingConverter; -import io.quarkus.kafka.client.runtime.KafkaRecorder; +import io.quarkus.dev.spi.DevModeType; +import io.quarkus.devconsole.spi.DevConsoleRouteBuildItem; +import io.quarkus.devconsole.spi.DevConsoleWebjarBuildItem; +import io.quarkus.kafka.client.runtime.*; import io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer; +import io.quarkus.kafka.client.runtime.ui.KafkaTopicClient; +import io.quarkus.kafka.client.runtime.ui.KafkaUiRecorder; +import io.quarkus.kafka.client.runtime.ui.KafkaUiUtils; import io.quarkus.kafka.client.serialization.BufferDeserializer; import io.quarkus.kafka.client.serialization.BufferSerializer; import io.quarkus.kafka.client.serialization.JsonArrayDeserializer; @@ -95,6 +102,7 @@ import io.quarkus.kafka.client.serialization.JsonbSerializer; import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer; import io.quarkus.kafka.client.serialization.ObjectMapperSerializer; +import io.quarkus.maven.dependency.GACT; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; public class KafkaProcessor { @@ -144,6 +152,11 @@ public class KafkaProcessor { static final DotName PARTITION_ASSIGNER = DotName .createSimple("org.apache.kafka.clients.consumer.internals.PartitionAssignor"); + private static final GACT DEVCONSOLE_WEBJAR_ARTIFACT_KEY = new GACT("io.quarkus", + "quarkus-kafka-client-deployment", null, "jar"); + private static final String DEVCONSOLE_WEBJAR_STATIC_RESOURCES_PATH = "dev-static/"; + public static final String KAFKA_ADMIN_PATH = "kafka-admin"; + public static final String KAFKA_RESOURCES_ROOT_PATH = "kafka-ui"; @BuildStep FeatureBuildItem feature() { @@ -165,7 +178,8 @@ void silenceUnwantedConfigLogs(BuildProducer logClean List ignoredMessages = new ArrayList<>(); for (String ignoredConfigProperty : ignoredConfigProperties) { - ignoredMessages.add("The configuration '" + ignoredConfigProperty + "' was supplied but isn't a known config."); + ignoredMessages + .add("The configuration '" + ignoredConfigProperty + "' was supplied but isn't a known config."); } logCleanupFilters.produce(new LogCleanupFilterBuildItem("org.apache.kafka.clients.consumer.ConsumerConfig", @@ -478,4 +492,41 @@ void registerServiceBinding(Capabilities capabilities, KafkaBindingConverter.class.getName())); } } + + // Kafka UI related stuff + + @BuildStep + public AdditionalBeanBuildItem kafkaClientBeans() { + return AdditionalBeanBuildItem.builder() + .addBeanClass(KafkaAdminClient.class) + .addBeanClass(KafkaTopicClient.class) + .addBeanClass(KafkaUiUtils.class) + .setUnremovable() + .build(); + } + + @BuildStep(onlyIf = IsDevelopment.class) + @Record(ExecutionTime.RUNTIME_INIT) + public void registerKafkaUiExecHandler( + BuildProducer routeProducer, + KafkaUiRecorder recorder) { + routeProducer.produce(DevConsoleRouteBuildItem.builder() + .method("POST") + .handler(recorder.kafkaControlHandler()) + .path(KAFKA_ADMIN_PATH) + .bodyHandlerRequired() + .build()); + } + + @BuildStep(onlyIf = IsDevelopment.class) + public DevConsoleWebjarBuildItem setupWebJar(LaunchModeBuildItem launchModeBuildItem) { + if (launchModeBuildItem.getDevModeType().orElse(null) != DevModeType.LOCAL) { + return null; + } + return DevConsoleWebjarBuildItem.builder().artifactKey(DEVCONSOLE_WEBJAR_ARTIFACT_KEY) + .root(DEVCONSOLE_WEBJAR_STATIC_RESOURCES_PATH) + .routeRoot(KAFKA_RESOURCES_ROOT_PATH) + .build(); + } + } diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/config.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/config.js new file mode 100644 index 0000000000000..130b130828fbe --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/config.js @@ -0,0 +1,2 @@ +export const api = '/q/dev/io.quarkus.quarkus-kafka-client/kafka-admin'; +export const ui = 'kafka-ui'; \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/kafka_ui.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/kafka_ui.js new file mode 100644 index 0000000000000..36667854c1944 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/kafka_ui.js @@ -0,0 +1,9 @@ +import Navigator from './pages/navigator.js' + +const navigator = new Navigator(); +$(document).ready( + () => { + navigator.navigateToDefaultPage(); + } +); + diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/accessControlListPage.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/accessControlListPage.js new file mode 100644 index 0000000000000..bf10bd05b5e66 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/accessControlListPage.js @@ -0,0 +1,49 @@ +import {doPost, errorPopUp} from "../web/web.js"; +import {createTableItem} from "../util/contentManagement.js"; +import {toggleSpinner} from "../util/spinner.js"; + +export default class AccessControlListPage { + constructor(containerId) { + this.containerId = containerId; + Object.getOwnPropertyNames(AccessControlListPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + + open() { + const req = { + action: "getAclInfo" + }; + toggleSpinner(this.containerId); + doPost(req, (data) => { + setTimeout(() => { + this.updateInfo(data); + toggleSpinner(this.containerId); + }, 2000); + }, data => { + errorPopUp("Error getting Kafka ACL info: ", data); + }); + } + + updateInfo(data) { + $('#acluster-id').html(data.clusterId); + $('#acluster-controller').html(data.broker); + $('#acluster-acl').html(data.aclOperations); + + const acls = data.entries; + let aclTable = $('#acl-table tbody'); + aclTable.empty(); + for (let i = 0; i < acls.length; i++) { + const e = acls[i]; + let tableRow = $(""); + tableRow.append(createTableItem(e.operation)); + tableRow.append(createTableItem(e.prinipal)); + tableRow.append(createTableItem(e.perm)); + tableRow.append(createTableItem(e.pattern)); + aclTable.append(tableRow); + } + } +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/consumerGroupDetailsPage.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/consumerGroupDetailsPage.js new file mode 100644 index 0000000000000..3766588ce9292 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/consumerGroupDetailsPage.js @@ -0,0 +1,85 @@ +import {CollapseRow, createTableHead, createTableItem, createTableItemHtml} from "../util/contentManagement.js"; + +export default class ConsumerGroupDetailsPage { + constructor(containerId) { + this.containerId = containerId; + Object.getOwnPropertyNames(ConsumerGroupDetailsPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + open(params) { + const membersData = params[1]; + let consumerGroupsTable = $('#consumer-group-details-table tbody'); + consumerGroupsTable.empty(); + for (let i = 0; i < membersData.length; i++) { + const d = membersData[i]; + const groupId = "group-" + d.memberId; + + let tableRow = $(""); + let collapseRow; + if (d.partitions.length > 0) { + collapseRow = new CollapseRow(groupId); + tableRow.append(createTableItemHtml(collapseRow.arrow)); + } else { + tableRow.append(createTableItem("")); + } + + const memberId = $("") + .text(d.clientId); + const id = d.memberId.substring(d.clientId.length); + const text = $("

") + .append(memberId) + .append(id); + tableRow.append(createTableItemHtml(text)); + tableRow.append(createTableItem(d.host)); + tableRow.append(createTableItem("" + new Set(d.partitions.map(x => x.partition)).size)); + tableRow.append(createTableItem("" + d.partitions.map(x => x.lag).reduce((l, r) => l + r, 0))); + + if (d.partitions.length > 0) { + const content = this.createConsumerGroupCollapseInfo(d); + tableRow.addClass("pointer") + tableRow.click(() => collapseRow.collapse()); + consumerGroupsTable.append(tableRow); + consumerGroupsTable.append(collapseRow + .getCollapseContent(tableRow.children().length, content) + .addClass("no-hover")); + } else { + consumerGroupsTable.append(tableRow); + } + } + } + + createConsumerGroupCollapseInfo(dataItem) { + const collapseContent = $("") + .addClass("table") + .addClass("table-sm") + .addClass("no-hover"); + + const headers = $("") + .addClass("no-hover") + .append(createTableHead("Topic")) + .append(createTableHead("Partition")) + .append(createTableHead("Lag")); + const head = $("") + .append(headers); + + const body = $(""); + for (let partition of dataItem.partitions) { + const row = $("") + .addClass("no-hover"); + row.append(createTableItemHtml(partition.topic)) + row.append(createTableItemHtml(partition.partition)) + row.append(createTableItemHtml(partition.lag)) + body.append(row); + } + + collapseContent.append(head); + collapseContent.append(body); + + return collapseContent; + } + +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/consumerGroupPage.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/consumerGroupPage.js new file mode 100644 index 0000000000000..51928af67a7e3 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/consumerGroupPage.js @@ -0,0 +1,47 @@ +import {createTableItem} from "../util/contentManagement.js"; +import {doPost, errorPopUp} from "../web/web.js"; +import {pages} from "./navigator.js"; +import {toggleSpinner} from "../util/spinner.js"; + +export default class ConsumerGroupPage { + constructor(navigator, containerId) { + this.containerId = containerId; + this.navigator = navigator; + Object.getOwnPropertyNames(ConsumerGroupPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + open() { + toggleSpinner(this.containerId); + const req = { + action: "getInfo", key: "0", value: "0" + }; + doPost(req, (data) => { + this.updateConsumerGroups(data.consumerGroups); + toggleSpinner(this.containerId); + }, data => { + errorPopUp("Error getting Kafka info: ", data); + toggleSpinner(this.containerId); + }); + } + + updateConsumerGroups(data) { + let consumerGroupsTable = $('#consumer-groups-table tbody'); + consumerGroupsTable.empty(); + for (let i = 0; i < data.length; i++) { + const d = data[i]; + let tableRow = $(""); + tableRow.append(createTableItem(d.state)); + tableRow.append(createTableItem(d.name)); + tableRow.append(createTableItem(d.coordinatorId)); + tableRow.append(createTableItem(d.protocol)); + tableRow.append(createTableItem(d.members.length)); + tableRow.append(createTableItem(d.lag)); + tableRow.click(() => this.navigator.navigateTo(pages.CONSUMER_GROUPS_DETAILS, [d.name, d.members])); + consumerGroupsTable.append(tableRow); + } + } +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/messagesPage.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/messagesPage.js new file mode 100644 index 0000000000000..f009847be3826 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/messagesPage.js @@ -0,0 +1,449 @@ +import {doPost, errorPopUp} from "../web/web.js"; +import timestampToFormattedString from "../util/datetimeUtil.js"; +import {CollapseRow, createTableItem, createTableItemHtml} from "../util/contentManagement.js"; +import {toggleSpinner} from "../util/spinner.js"; + +const MODAL_KEY_TAB = "header-key-tab-pane"; +const PAGE_SIZE = 20; +const NEW_FIRST = "NEW_FIRST"; +const OLD_FIRST = "OLD_FIRST"; +const MESSAGES_SPINNER = "message-load-spinner"; +const MESSAGES_TABLE_BODY = "msg-table-body"; +const MESSAGES_TABLE_HOLDER = "msg-table-holder"; + +export default class MessagesPage { + constructor(containerId) { + this.containerId = containerId; + this.registerButtonHandlers(); + Object.getOwnPropertyNames(MessagesPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + registerButtonHandlers() { + $("#open-create-msg-modal-btn").click(() => { + $('#create-msg-modal').modal('show'); + this.setActiveTab(MODAL_KEY_TAB); + }); + + $('#send-msg-btn').click(this.createMessage.bind(this)); + + $('.close-modal-btn').click(() => { + $('.modal').modal('hide'); + this.setActiveTab(MODAL_KEY_TAB); + }); + + $('#msg-page-partition-select').multiselect({ + buttonClass: 'thead-multiselect', + includeSelectAllOption: true, + filterPlaceholder: 'Partitions', + selectAllText: 'Select All', + nonSelectedText: 'Partitions', + buttonText: function () { + return 'Partitions'; + } + }); + + $("#timestamp-sort-header").click(() => { + this.toggleSorting(); + window.currentContext.currentPage = 1; + this.loadMessages(); + }); + + $("#msg-page-partition-select").change(() => { + window.currentContext.currentPage = 1; + this.loadMessages(); + }); + + $(".previous").click(() => { + if (window.currentContext.currentPage === 1) return; + window.currentContext.currentPage = window.currentContext.currentPage - 1; + this.loadMessages(); + }) + + $(".next").click(() => { + if (window.currentContext.currentPage === this.getMaxPageNumber()) return; + window.currentContext.currentPage = window.currentContext.currentPage + 1; + this.loadMessages(); + }) + + $("#reload-msg-btn").click(() => { + currentContext.pagesCache = new Map(); + this.loadMessages(); + }); + } + + toggleSorting() { + if (currentContext.currentSorting === NEW_FIRST) { + currentContext.currentSorting = OLD_FIRST; + $("#timestamp-sort-icon") + .removeClass("bi-chevron-double-down") + .addClass("bi-chevron-double-up"); + } else { + currentContext.currentSorting = NEW_FIRST; + $("#timestamp-sort-icon") + .addClass("bi-chevron-double-down") + .removeClass("bi-chevron-double-up"); + } + } + + loadMessages() { + toggleSpinner(MESSAGES_TABLE_HOLDER, MESSAGES_SPINNER); + this.getPage(currentContext.currentPage, this.onMessagesLoaded, this.onMessagesFailed); + this.redrawPageNav(); + } + + open(params) { + toggleSpinner(MESSAGES_TABLE_HOLDER, MESSAGES_SPINNER); + const topicName = params[0]; + window.currentContext = { + topicName: topicName, + currentPage: 1, //always start with first page + pagesCache: new Map(), + currentSorting: NEW_FIRST + }; + + this.clearMessageTable(); + + new Promise((resolve, reject) => { + this.requestPartitions(topicName, resolve, reject); + }).then((data) => { + this.onPartitionsLoaded(data); + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + this.loadMaxPageNumber(); + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }).then(() => { + this.getPage(currentContext.currentPage, this.onMessagesLoaded, this.onMessagesFailed); + return new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 1000); + }); + }) + .catch(() => errorPopUp("Failed loading page.")); + } + + // Key format: ORDER-partition1-partition2-...-partitionN-pageNumber. Like: NEW_FIRST-0-1-17 + generateCacheKey(pageNumber) { + const order = this.getOrder(); + const partitions = this.getPartitions(); + const partitionsKeyPart = partitions.reduce((partialKey, str) => partialKey + "-" + str, 0); + + return order + partitionsKeyPart + "-" + pageNumber; + } + + requestPartitions(topicName, onPartitionsLoaded, onPartitionsFailed) { + const rq = { + action: "getPartitions", topicName: topicName + } + + doPost(rq, onPartitionsLoaded, onPartitionsFailed); + } + + onPartitionsLoaded(data) { + let msgModalPartitionSelect = $('#msg-modal-partition-select'); + let msgPagePartitionSelect = $('#msg-page-partition-select'); + msgModalPartitionSelect.empty(); + msgPagePartitionSelect.empty(); + + msgModalPartitionSelect.append($(""); + const groupId = "group-" + window.crypto.randomUUID(); + const collapseRow = new CollapseRow(groupId); + tableRow.append(createTableItemHtml(collapseRow.arrow)); + + tableRow.append(createTableItem(messages[i].offset)); + tableRow.append(createTableItem(messages[i].partition)); + tableRow.append(createTableItem(timestampToFormattedString(messages[i].timestamp))); + tableRow.append(createTableItem(messages[i].key)); + + const value = messages[i].value; + const maxMsgLength = 75; + if (value.length < maxMsgLength) { + tableRow.append(createTableItem(value)); + } else { + tableRow.append(createTableItem(value.slice(0, maxMsgLength) + "...")); + } + tableRow.append(createTableItem()); + tableRow + .addClass("pointer") + .click(collapseRow.collapse); + msgTableBody.append(tableRow); + msgTableBody.append(collapseRow.getCollapseContent(tableRow.children().length, this.createMessageCollapseItem(value))); + } + + currentContext.lastOffset = data.partitionOffset; + toggleSpinner(MESSAGES_TABLE_HOLDER, MESSAGES_SPINNER); + } + + createMessageCollapseItem(fullMessage) { + return $("
") + .text(fullMessage); + } + + toggleContent() { + return (event) => { + const textBlock = $(event.target); + const dots = textBlock.find(".dots"); + const hiddenText = textBlock.find(".hidden-text"); + + if (dots.hasClass("hidden")) { + dots.removeClass("hidden"); + dots.addClass("text-shown"); + hiddenText.removeClass("text-shown"); + hiddenText.addClass("hidden"); + } else { + dots.removeClass("text-shown"); + dots.addClass("hidden"); + hiddenText.removeClass("hidden"); + hiddenText.addClass("text-shown"); + } + }; + } + + onMessagesFailed(data, errorType, error) { + console.error("Error getting topic messages"); + } + + requestCreateMessage() { + const topicName = currentContext.topicName; + let partition = $('#msg-modal-partition-select option:selected').val(); + if (partition === 'any') partition = null; + + let valueTextarea = $('#msg-value-textarea'); + let keyTextarea = $('#msg-key-textarea'); + const rq = { + action: "createMessage", + topic: topicName, + partition: partition, + value: valueTextarea.val(), + key: keyTextarea.val() + }; + + // TODO: print out partitions count on topics page + doPost(rq, data => { + currentContext.pagesCache = new Map(); + new Promise(this.loadMaxPageNumber) + .then(this.loadMessages) + .catch(() => errorPopUp("Failed")); + }, (data, errorType, error) => { + errorPopUp("Failed to reload messages."); + }); + } + + setActiveTab(tab) { + $('.nav-tabs button[href="#' + tab + '"]').click(); + }; + + createMessage() { + this.requestCreateMessage(); + + // Clean inputs for future reuse of modal. + $('#create-msg-modal').modal('hide'); + $('#msg-value-textarea').val(""); + $('#msg-key-textarea').val(""); + $('#msg-modal-partition-select').val("any"); + $('#msg-modal-type-select').val("text"); + + $('body').removeClass('modal-open'); + $('.modal-backdrop').remove(); + + this.setActiveTab(MODAL_KEY_TAB); + } + + clearMessageTable() { + $('#msg-table-body').empty(); + } + + redrawPageNav() { + //TODO: add GOTO page input + const previous = $(".previous"); + const next = $(".next"); + + previous.removeClass("disabled"); + next.removeClass("disabled"); + + const maxPageNumber = this.getMaxPageNumber(); + const currentPage = currentContext.currentPage; + let pages = [currentPage]; + + if (currentPage > 1) { + pages.unshift(currentPage - 1); + } + if (currentPage < maxPageNumber) { + pages.push(currentPage + 1); + } + + if (currentPage === 1) { + previous.addClass("disabled"); + if (maxPageNumber > 2) { + pages.push(currentPage + 2); + } + } + if (currentPage === maxPageNumber) { + next.addClass("disabled"); + if (maxPageNumber > 2) { + pages.unshift(currentPage - 2); + } + } + + const pagination = $("#msg-pagination"); + + // Remove all page children numbers. + while (pagination.children().length !== 2) { + pagination.children()[1].remove(); + } + + for (const p of pages) { + let a = $("") + .text("" + p) + .addClass("page-link"); + let li = $("
  • ") + .addClass("page-item") + .click(() => { + toggleSpinner(MESSAGES_TABLE_HOLDER, MESSAGES_SPINNER); + currentContext.currentPage = p; + this.getPage(p, this.onMessagesLoaded, this.onMessagesFailed); + this.redrawPageNav(); + }); + + if (p === currentPage) { + li.addClass("active"); + } + li.append(a); + + const lastPosition = pagination.children().length - 1; + li.insertBefore(".next"); + } + } + + requestOffset(topicName, order, onOffsetLoaded, onOffsetFailed, partitions) { + const req = { + action: "getOffset", + topicName: topicName, + order: order, + requestedPartitions: partitions === undefined ? this.getPartitions() : partitions + }; + doPost(req, onOffsetLoaded, onOffsetFailed); + } + + // TODO: add possibility to hide panel on the left + loadMaxPageNumber() { + const partitions = this.getPartitions(); + this.requestOffset( + currentContext.topicName, + NEW_FIRST, + (data) => { + currentContext.partitionOffset = new Map( + Object.entries(data).map(x => [parseInt(x[0]), x[1]]) + ); + this.redrawPageNav(); + }, + (data, errorType, error) => { + console.error("Error getting max page number."); + }, + partitions + ); + } + + getMaxPageNumber() { + const partitions = this.getPartitions(); + const totalElements = partitions.map(x => { + const a = currentContext.partitionOffset.get(x) + return a; + }) + .reduce((partialSum, a) => partialSum + a, 0); + return Math.max(Math.ceil(totalElements / PAGE_SIZE), 1); + } + + getOrder() { + return currentContext.currentSorting; + } + +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/navigator.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/navigator.js new file mode 100644 index 0000000000000..cd5e66615a4aa --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/navigator.js @@ -0,0 +1,175 @@ +import MessagesPage from "./messagesPage.js"; +import TopicsPage from "./topicsPage.js"; +import ConsumerGroupPage from "./consumerGroupPage.js"; +import ConsumerGroupDetailsPage from "./consumerGroupDetailsPage.js"; +import AccessControlListPage from "./accessControlListPage.js"; +import NodesPage from "./nodesPage.js"; +import {createIcon} from "../util/contentManagement.js"; + +export const pages = { + TOPICS: "topics-page", + SCHEMA: "schema-page", + CONSUMER_GROUPS: "consumer-groups-page", + CONSUMER_GROUPS_DETAILS: "consumer-groups-details-page", + ACCESS_CONTROL_LIST: "access-control-list-page", + NODES: "nodes-page", + TOPIC_MESSAGES: "topic-messages-page", + DEFAULT: "topics-page" +} + +export default class Navigator { + constructor() { + this.registerNavbar(); + } + + allPages = { + [pages.TOPICS]: { + header: "Topics", + showInNavbar: true, + instance: new TopicsPage(this, pages.TOPICS), + icon: "bi-collection" + }, + [pages.SCHEMA]: { + header: "Schema registry", + showInNavbar: true, + icon: "bi-file-code" + }, + [pages.CONSUMER_GROUPS]: { + header: "Consumer groups", + showInNavbar: true, + instance: new ConsumerGroupPage(this, pages.CONSUMER_GROUPS), + icon: "bi-inboxes" + }, + [pages.ACCESS_CONTROL_LIST]: { + header: "Access control list", + showInNavbar: true, + instance: new AccessControlListPage(pages.ACCESS_CONTROL_LIST), + icon: "bi-shield-lock" + }, + [pages.NODES]: { + header: "Nodes", + showInNavbar: true, + instance: new NodesPage(pages.NODES), + icon: "bi-diagram-3" + }, + [pages.TOPIC_MESSAGES]: { + header: "Messages", + showInNavbar: false, + instance: new MessagesPage(pages.TOPIC_MESSAGES), + parent: pages.TOPICS + }, + [pages.CONSUMER_GROUPS_DETAILS]: { + header: "Consumer group details", + showInNavbar: false, + instance: new ConsumerGroupDetailsPage(pages.CONSUMER_GROUPS_DETAILS), + parent: pages.CONSUMER_GROUPS + } + }; + + registerNavbar() { + const keys = Object.keys(this.allPages); + const navbar = $("#navbar-list"); + navbar.empty(); + + for (let i = 0; i < keys.length; i++) { + const key = keys[i]; + const value = this.allPages[key]; + if (!value.showInNavbar) continue; + const navItem = $("
  • ") + .addClass("nav-item") + .addClass("left-padding") + .addClass("pointer"); + + const navHolder = $("
    ") + .addClass("d-flex") + .addClass("left-margin") + .addClass("nav-row") + .click(() => this.navigateTo(key)); + + const icon = createIcon(value.icon) + .addClass("align-self-center"); + const navLink = $("", { + text: value.header, + href: "#" + }) + .addClass("nav-link") + .addClass("active") + .addClass("link"); + navHolder.append(icon); + navHolder.append(navLink); + navItem.append(navHolder); + navbar.append(navItem); + } + } + + navigateTo(requestedPage, params) { + const keys = Object.keys(this.allPages); + for (let i = 0; i < keys.length; i++) { + const elementName = keys[i]; + const d = $("#" + elementName); + if (d !== null) { + if (elementName !== requestedPage) { + d.removeClass("shown") + .addClass("hidden"); + } else { + d.removeClass("hidden") + .addClass("shown"); + this.open(requestedPage, params); + } + } else { + console.error("Can not find page div: ", keys[i]); + } + } + + this.navigateBreadcrumb(requestedPage, params); + } + + navigateToDefaultPage() { + this.navigateTo(pages.DEFAULT); + } + + open(pageId, params) { + const value = this.allPages[pageId]; + value.instance.open(params); + } + + navigateBreadcrumb(page, params) { + const breadcrumb = $("#nav-breadcrumb"); + breadcrumb.empty(); + + let nextPage = this.allPages[page]; + let pageId = page; + + let i = 0; + while (nextPage !== undefined) { + let li; + // We only need to append possible params to the very first element. + if (i === 0) { + li = this.createBreadcrumbItem(nextPage.header, pageId, true, params); + } else { + li = this.createBreadcrumbItem(nextPage.header, pageId, false); + } + breadcrumb.prepend(li); + pageId = nextPage.parent; + nextPage = this.allPages[pageId]; + i++; + } + } + + createBreadcrumbItem(text, pageId, isActive, params) { + let breadcrumbText = text; + if (params !== undefined && params.length > 0 && (params[0] !== null && params[0] !== undefined)) { + breadcrumbText = text + " (" + params[0] + ")"; + } + const a = $("", {href: "#", text: breadcrumbText}) + .click(() => this.navigateTo(pageId, params)); + if (isActive) { + a.addClass("active"); + } + + const li = $("
  • ") + .addClass("breadcrumb-item"); + li.append(a); + return li; + } +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/nodesPage.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/nodesPage.js new file mode 100644 index 0000000000000..94b2b1e6a270d --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/nodesPage.js @@ -0,0 +1,47 @@ +import {doPost, errorPopUp} from "../web/web.js"; +import {createTableItem} from "../util/contentManagement.js"; +import {toggleSpinner} from "../util/spinner.js"; + +export default class NodesPage { + constructor(containerId) { + this.containerId = containerId; + Object.getOwnPropertyNames(NodesPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + open() { + const req = { + action: "getInfo" + }; + doPost(req, (data) => { + setTimeout(() => { + this.updateInfo(data); + toggleSpinner(this.containerId); + }, 2000); + }, data => { + errorPopUp("Error getting Kafka info: ", data); + }); + toggleSpinner(this.containerId); + } + + updateInfo(data) { + $('#cluster-id').html(data.clusterInfo.id); + $('#cluster-controller').html(data.broker); + $('#cluster-acl').html(data.clusterInfo.aclOperations); + + const nodes = data.clusterInfo.nodes; + let clusterNodesTable = $('#cluster-table tbody'); + clusterNodesTable.empty(); + for (let i = 0; i < nodes.length; i++) { + const d = nodes[i]; + let tableRow = $("
  • "); + tableRow.append(createTableItem(d.id)); + tableRow.append(createTableItem(d.host)); + tableRow.append(createTableItem(d.port)); + clusterNodesTable.append(tableRow); + } + } +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/schemaPage.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/schemaPage.js new file mode 100644 index 0000000000000..82b3f5f8d108c --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/schemaPage.js @@ -0,0 +1,16 @@ +export default class SchemaPage{ + constructor(containerId) { + this.containerId = containerId; + Object.getOwnPropertyNames(SchemaPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + // TODO: stub. must be implemented by all pages + open(){ + + } + +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/topicsPage.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/topicsPage.js new file mode 100644 index 0000000000000..28156717578c8 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/pages/topicsPage.js @@ -0,0 +1,188 @@ +import {doPost, errorPopUp} from "../web/web.js"; +import {createIcon, createTableItem, createTableItemHtml, hideItem, showItem} from "../util/contentManagement.js"; +import {pages} from "./navigator.js"; + +export default class TopicsPage { + constructor(navigator, containerId) { + this.navigator = navigator; + this.containerId = containerId; + this.registerButtonHandlers(); + + // TODO: move to common function with comment + Object.getOwnPropertyNames(TopicsPage.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + open() { + window.currentContext = {}; + this.requestTopics(this.onTopicsLoaded, this.onTopicsFailed); + } + + registerButtonHandlers() { + + const topicNameInput = $("#topic-name-modal-input"); + $("#create-topic-btn").click(() => { + if (!this.validateTopicName(topicNameInput.val())) { + this.showErrorIfInvalid(topicNameInput.val(), this.validateTopicName, topicNameValidationErrorBox); + return; + } + + this.createTopic(this.onTopicsLoaded, this.onTopicsFailed); + $('#create-topic-modal').modal('hide'); + $('#topic-name-modal-input').val(""); + $('#partitions-modal-input').val(""); + $('#replications-modal-input').val(""); + }) + + $("#open-create-topic-modal-btn").click(() => { + this.loadNodesCount(); + $('#create-topic-modal').modal('show'); + }); + + $('.close-modal-btn').click(() => { + hideItem($(".modal")); + hideItem($("#topic-creation-validation-msg-box")); + hideItem($("#topic-name-validation-msg")); + hideItem($("#replication-validation-msg")); + }); + + $("#delete-topic-btn").click(() => { + const currentTopic = window.currentContext.topicName; + this.deleteTopic(currentTopic, this.deleteTopicRow, this.onTopicsFailed) + $("#delete-topic-modal").modal("hide"); + }); + + const topicNameValidationErrorBox = $("#topic-name-validation-msg"); + topicNameInput.keyup(() => this.showErrorIfInvalid(topicNameInput.val(), this.validateTopicName, topicNameValidationErrorBox)); + topicNameInput.change(() => this.showErrorIfInvalid(topicNameInput.val(), this.validateTopicName, topicNameValidationErrorBox)); + + const replicationInput = $("#replications-modal-input"); + replicationInput.keyup(() => { + const value = replicationInput.val(); + this.showErrorIfInvalid(value, this.validateReplicationFactor, $("#replication-validation-msg")); + }); + } + + loadNodesCount() { + const req = { + action: "getInfo" + }; + doPost(req, (data) => { + window.currentContext.nodesCount = data.clusterInfo.nodes.length; + }, data => { + errorPopUp("Could not obtain nodes count."); + }); + } + + showErrorIfInvalid(value, validationFunction, errBoxSelector) { + const valid = validationFunction(value); + if (!valid) { + showItem($("#topic-creation-validation-msg-box")); + showItem(errBoxSelector); + $("#create-topic-btn") + .addClass("disabled") + .attr("disabled", true); + } else { + hideItem(errBoxSelector); + const topicMsgValidationBoxChildren = $("#topic-creation-validation-msg-box span"); + const allChildrenHidden = topicMsgValidationBoxChildren + .filter((x) => !$(x).hasClass("hidden")) + .length > 0; + if (allChildrenHidden) { + hideItem($("#topic-creation-validation-msg-box")); + $("#create-topic-btn") + .removeClass("disabled") + .attr("disabled", false); + } + } + } + + validateTopicName(name) { + const legalChars = /^[a-zA-Z\d\.\_]+$/; + const maxNameLength = 255; + return legalChars.test(name) && name.length < maxNameLength; + } + + validateReplicationFactor(replicationFactor) { + return currentContext.nodesCount >= replicationFactor; + } + + requestTopics(onTopicsLoaded, onTopicsFailed) { + const req = { + action: "getTopics" + }; + doPost(req, onTopicsLoaded, onTopicsFailed); + } + + onTopicsLoaded(data) { + let tableBody = $('#topics-table tbody'); + tableBody.empty(); + + for (let i = 0; i < data.length; i++) { + let tableRow = $(""); + let d = data[i]; + tableRow.append(createTableItem(d.name)); + tableRow.append(createTableItem(d.topicId)); + tableRow.append(createTableItem(d.partitionsCount)); + tableRow.append(createTableItem(("" + d.nmsg))); + + const deleteIcon = createIcon("bi-trash-fill"); + const deleteBtn = $("") + .addClass("btn") + .click((event) => { + window.currentContext.topicName = d.name; + $("#delete-topic-modal").modal("show"); + $("#delete-topic-name-span").text(d.name); + event.stopPropagation(); + }) + .append(deleteIcon); + + + tableRow.click(() => { + self.navigator.navigateTo(pages.TOPIC_MESSAGES, [d.name]); + }); + const controlHolder = $("
    ") + .append(deleteBtn); + tableRow.append(createTableItemHtml(controlHolder)); + + const self = this; + + tableBody.append(tableRow); + } + } + + onTopicsFailed(data) { + errorPopUp("Error getting topics: ", data); + } + + createTopic(onTopicsLoaded, onTopicsFailed) { + const topicName = $("#topic-name-modal-input").val(); + const partitions = $("#partitions-modal-input").val(); + const replications = $("#replications-modal-input").val(); + + const req = { + action: "createTopic", + topicName: topicName, + partitions: partitions, + replications: replications + }; + doPost(req, () => this.requestTopics(this.onTopicsLoaded, this.onTopicsFailed), onTopicsFailed); + } + + // TODO: add pagination here + deleteTopic(topicName, onTopicsDeleted, onTopicsFailed) { + const req = { + action: "deleteTopic", + key: topicName + }; + doPost(req, onTopicsDeleted, onTopicsFailed); + } + + deleteTopicRow(data) { + const topicName = window.currentContext.topicName; + $("#topics-table > tbody > tr > td:contains('" + topicName + "')").parent().remove() + } +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/util/contentManagement.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/util/contentManagement.js new file mode 100644 index 0000000000000..d9f87034ab8dd --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/util/contentManagement.js @@ -0,0 +1,75 @@ +export function createTableItem(text) { + return $("
    ").append(createTableItemHtml( + collapseContent + .addClass("collapse-content")) + .attr("colspan", tableWidth)) + .attr("id", this.collapseId) + .addClass("collapse"); + } + + collapse() { + $("#" + this.collapseId).toggle(); + if (this.arrow.hasClass("icon-rotated")) { + this.arrow.removeClass("icon-rotated"); + } else { + this.arrow.addClass("icon-rotated"); + } + } +} + +export function showItem(selector){ + selector.addClass("shown") + .removeClass("hidden"); +} + +export function hideItem(selector){ + selector.addClass("hidden") + .removeClass("shown"); +} + +export function toggleItem(selector) { + if (selector.hasClass("shown")) { + hideItem(selector); + } else { + showItem(selector); + } +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/util/datetimeUtil.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/util/datetimeUtil.js new file mode 100644 index 0000000000000..384d693bde861 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/util/datetimeUtil.js @@ -0,0 +1,17 @@ +function addTrailingZero(data) { + if (data < 10) { + return "0" + data; + } + return data; +} + +export default function timestampToFormattedString(UNIX_timestamp) { + const a = new Date(UNIX_timestamp); + const year = a.getFullYear(); + const month = addTrailingZero(a.getMonth()); + const date = addTrailingZero(a.getDate()); + const hour = addTrailingZero(a.getHours()); + const min = addTrailingZero(a.getMinutes()); + const sec = addTrailingZero(a.getSeconds()); + return date + '/' + month + '/' + year + ' ' + hour + ':' + min + ':' + sec; +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/util/spinner.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/util/spinner.js new file mode 100644 index 0000000000000..a5ca80594e44f --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/util/spinner.js @@ -0,0 +1,21 @@ +export function toggleSpinner(containerId, spinnerContainerId) { + const spinnerId = spinnerContainerId === undefined ? "#page-load-spinner" : "#" + spinnerContainerId; + const toggleContainerId = "#" + containerId; + let first; + let second; + + if ($(spinnerId).hasClass("shown")) { + first = toggleContainerId; + second = spinnerId; + } else { + second = toggleContainerId; + first = spinnerId; + } + + $(first) + .removeClass("hidden") + .addClass("shown"); + $(second) + .addClass("hidden") + .removeClass("shown"); +} \ No newline at end of file diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-static/js/web/web.js b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/web/web.js new file mode 100644 index 0000000000000..6ba79b5c19720 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-static/js/web/web.js @@ -0,0 +1,22 @@ +import {api} from "../config.js" + +export function doPost(data, successCallback, errorCallback) { + $.ajax({ + url: api, + type: 'POST', + data: JSON.stringify(data), + contentType: "application/json; charset=utf-8", + dataType: 'json', + context: this, + success: (data) => successCallback(data), + error: (data, errorType, errorObj) => errorCallback(data, errorType, errorObj) + }); +} + +export function errorPopUp() { + let message = ""; + for (let i = 0; i < arguments.length; i++) { + message += arguments[i] + " "; + } + alert(message); +} diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-templates/embedded.html b/extensions/kafka-client/deployment/src/main/resources/dev-templates/embedded.html new file mode 100644 index 0000000000000..dac4cf81f4150 --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-templates/embedded.html @@ -0,0 +1,3 @@ + + + Kafka UI diff --git a/extensions/kafka-client/deployment/src/main/resources/dev-templates/kafka-dev-ui.html b/extensions/kafka-client/deployment/src/main/resources/dev-templates/kafka-dev-ui.html new file mode 100644 index 0000000000000..f07f33984441f --- /dev/null +++ b/extensions/kafka-client/deployment/src/main/resources/dev-templates/kafka-dev-ui.html @@ -0,0 +1,513 @@ +{#include main fluid=true} +{#style} +html { +min-height: 90vh; +min-width: 100vh; +} + +body { +min-height: 90vh; +min-width: 100vh; +} + +.row-holder { +padding: 0; +margin: 0; + +} + +.row:after { +content: ""; +display: table; +clear: both; +} + +.content-holder { +height: auto; +min-height: 90vh; +} + +.link { +background: none; +border: none; +} + +.top-margin { +margin-top: 1em; +} + +.left-margin { +margin-left: 1em; +} + +.left-padding { +padding-left: 1em; +} + +.shown { +display: flex; +height: auto; +min-width: 100%; +} + +.text-shown { +display: inline; +} + +.hidden { +display: none +} + +.nav-item:hover > .nav-row > a { +background-color: #005fff; +color: #e9ecef; +} + +.nav-item:hover > .nav-row > i { +background-color: #005fff; +color: #e9ecef; +} + +#navbar-list > .nav-item:hover { +background-color: #005fff; +color: #e9ecef; +} + +.table-hover:hover { +cursor: pointer; +} + +.multiselect-container > li > a > label { +padding-left: 15px !important; +} + +.page { +min-height: calc(100vh - 135px); +} + +.table-hover:hover { +cursor: pointer; +} + +.pointer { +cursor: pointer; +} + +.no-hover { +background-color: white; +cursor: default; +} + +.no-hover:hover { +background-color: white !important; +cursor: default; +} + +.icon-rotated { +transform: rotate(90deg); +} + +.navbar-brand img { +border-right: 1px solid darkgrey; +padding-right: 10px; +margin-right: 5px; +} + +.navbar-brand { +padding: 0; +margin: 0; +} + +#nav-menu-panel { +padding: 0px; +} + +.float-plus-btn { +position: fixed; +bottom: 60px; +right: 60px; +border-radius: 100%; +height: 50px; +width: 50px; +} + +.breadcrumb-item::before { +float: left; +padding-right: 0.5rem; +color: #007bff; +content: "〉"; +} + +.breadcrumb-item + .breadcrumb-item::before { +float: left; +padding-right: 0.5rem; +color: #007bff; +content: "〉"; +} + +.breadcrumb { +background-color: #343a40; +margin-bottom: 0; +padding: 0 0 0 5px; +} + +.bi-trash-fill:hover { +color: #007bff; +} + +.collapse-content { +max-width: 1200px; +} + +.thead-multiselect { +background-color: #343a40; +color: white; +border: 0px; +font-weight: bold; +} + +.thead-text { +color: white; +} + +#msg-table-holder { +min-width: 100%; +} +{/style} +{#styleref} + + +{/styleref} +{#scriptref} + + +{/scriptref} +{#title}Kafka Dev UI{/title} +{#body} +
    + + + + + + +
    +
    +
    +
    ", { + text: text + }); +} + +export function createTableItemHtml(html) { + return $("").append(html); +} + +export function createTableHead(title) { + return $("") + .attr("scope", "col") + .text(title); +} + +export function createIcon(iconClass) { + return $("") + .addClass("bi") + .addClass(iconClass); +} + +export class CollapseRow { + constructor(collapseId) { + this.collapseId = collapseId; + const chevronIcon = createIcon("bi-chevron-right") + .addClass("rotate-icon"); + this.arrow = $("
    ") + .addClass("d-flex") + .addClass("justify-content-center") + .append(chevronIcon); + + Object.getOwnPropertyNames(CollapseRow.prototype).forEach((key) => { + if (key !== 'constructor') { + this[key] = this[key].bind(this); + } + }); + } + + getCollapseContent(tableWidth, collapseContent) { + return $("
    + + + + + + + + + + + +
    Topic NameIdPartitions countNumber of msg
    + + + +

    +
    +
    +
    + + + + + + + + + + + + + + +
    Offset + + Timestamp KeyValue
    +
    + +
    +
    + + +
    +
    +
    + + + + + + + + + + + + + +
    StateIdCoordinatorProtocolMembersLag(Sum)
    +
    +
    +
    +
    + + + + + + + + + + + + +
    Member IDHostPartitionsLag(Sum)
    +
    +
    +
    +
    +
    + Kafka cluster id: 
    + Controller node (broker): 
    + ACL operations: 
    +
    +
    +

    Cluster nodes

    +
    + + + + + + + + + + +
    IdHostPort
    +
    +
    + + + +{/body} +{/include} \ No newline at end of file diff --git a/extensions/kafka-client/runtime/pom.xml b/extensions/kafka-client/runtime/pom.xml index 50ac39873257c..1d237acadf213 100644 --- a/extensions/kafka-client/runtime/pom.xml +++ b/extensions/kafka-client/runtime/pom.xml @@ -59,6 +59,11 @@ provided
    + + io.quarkus + quarkus-vertx-http-dev-console-runtime-spi + + io.quarkus quarkus-junit5-internal diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/health/KafkaHealthCheck.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/health/KafkaHealthCheck.java index a0b7c6648caa7..e9b9a24bd265d 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/health/KafkaHealthCheck.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/health/KafkaHealthCheck.java @@ -1,43 +1,23 @@ package io.quarkus.kafka.client.health; -import java.util.HashMap; -import java.util.Map; - -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.common.Node; import org.eclipse.microprofile.health.HealthCheck; import org.eclipse.microprofile.health.HealthCheckResponse; import org.eclipse.microprofile.health.HealthCheckResponseBuilder; import org.eclipse.microprofile.health.Readiness; -import io.smallrye.common.annotation.Identifier; +import io.quarkus.kafka.client.runtime.KafkaAdminClient; @Readiness @ApplicationScoped public class KafkaHealthCheck implements HealthCheck { - @Inject - @Identifier("default-kafka-broker") - Map config; - - private AdminClient client; - - @PostConstruct - void init() { - Map conf = new HashMap<>(config); - conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); - client = AdminClient.create(conf); - } + KafkaAdminClient kafkaAdminClient; - @PreDestroy - void stop() { - client.close(); + public KafkaHealthCheck(KafkaAdminClient kafkaAdminClient) { + this.kafkaAdminClient = kafkaAdminClient; } @Override @@ -45,7 +25,7 @@ public HealthCheckResponse call() { HealthCheckResponseBuilder builder = HealthCheckResponse.named("Kafka connection health check").up(); try { StringBuilder nodes = new StringBuilder(); - for (Node node : client.describeCluster().nodes().get()) { + for (Node node : kafkaAdminClient.getCluster().nodes().get()) { if (nodes.length() > 0) { nodes.append(','); } diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java new file mode 100644 index 0000000000000..c9b75dc1d00c0 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaAdminClient.java @@ -0,0 +1,84 @@ +package io.quarkus.kafka.client.runtime; + +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.resource.ResourcePatternFilter; + +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaCreateTopicRequest; +import io.smallrye.common.annotation.Identifier; + +@ApplicationScoped +public class KafkaAdminClient { + private static final int DEFAULT_ADMIN_CLIENT_TIMEOUT = 5000; + + @Inject + @Identifier("default-kafka-broker") + Map config; + + private AdminClient client; + + @PostConstruct + void init() { + Map conf = new HashMap<>(config); + conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_ADMIN_CLIENT_TIMEOUT); + client = AdminClient.create(conf); + } + + @PreDestroy + void stop() { + client.close(); + } + + public DescribeClusterResult getCluster() { + return client.describeCluster(); + } + + public Collection getTopics() throws InterruptedException, ExecutionException { + return client.listTopics().listings().get(); + } + + public Collection getConsumerGroups() throws InterruptedException, ExecutionException { + var consumerGroupIds = client.listConsumerGroups().all().get().stream() + .map(ConsumerGroupListing::groupId) + .collect(Collectors.toList()); + return client.describeConsumerGroups(consumerGroupIds).all().get() + .values(); + } + + public boolean deleteTopic(String name) { + Collection topics = new ArrayList<>(); + topics.add(name); + DeleteTopicsResult dtr = client.deleteTopics(topics); + return dtr.topicNameValues() != null; + } + + public boolean createTopic(KafkaCreateTopicRequest kafkaCreateTopicRq) { + var partitions = Optional.ofNullable(kafkaCreateTopicRq.getPartitions()).orElse(1); + var replications = Optional.ofNullable(kafkaCreateTopicRq.getReplications()).orElse((short) 1); + var newTopic = new NewTopic(kafkaCreateTopicRq.getTopicName(), partitions, replications); + + CreateTopicsResult ctr = client.createTopics(List.of(newTopic)); + return ctr.values() != null; + } + + public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId) { + return client.listConsumerGroupOffsets(groupId); + } + + public Collection getAclInfo() throws InterruptedException, ExecutionException { + AclBindingFilter filter = new AclBindingFilter(ResourcePatternFilter.ANY, AccessControlEntryFilter.ANY); + var options = new DescribeAclsOptions().timeoutMs(1_000); + return client.describeAcls(filter, options).values().get(); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java index 2be14e5717251..93e2ca309ab99 100644 --- a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/KafkaRuntimeConfigProducer.java @@ -17,7 +17,7 @@ public class KafkaRuntimeConfigProducer { // not "kafka.", because we also inspect env vars, which start with "KAFKA_" private static final String CONFIG_PREFIX = "kafka"; - + private static final String UI_CONFIG_PREFIX = CONFIG_PREFIX + ".ui"; private static final String GROUP_ID = "group.id"; @Produces @@ -29,7 +29,10 @@ public Map createKafkaRuntimeConfig(Config config, ApplicationCo for (String propertyName : config.getPropertyNames()) { String propertyNameLowerCase = propertyName.toLowerCase(); - if (!propertyNameLowerCase.startsWith(CONFIG_PREFIX)) { + if (propertyNameLowerCase.startsWith(UI_CONFIG_PREFIX)) { + config.getOptionalValue(propertyName, String.class).orElse(""); + } + if (!propertyNameLowerCase.startsWith(CONFIG_PREFIX) || propertyNameLowerCase.startsWith(UI_CONFIG_PREFIX)) { continue; } // Replace _ by . - This is because Kafka properties tend to use . and env variables use _ for every special diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/AbstractHttpRequestHandler.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/AbstractHttpRequestHandler.java new file mode 100644 index 0000000000000..ede6bc54f21f8 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/AbstractHttpRequestHandler.java @@ -0,0 +1,76 @@ +package io.quarkus.kafka.client.runtime.ui; + +import io.quarkus.arc.Arc; +import io.quarkus.arc.ManagedContext; +import io.vertx.core.Handler; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.web.RoutingContext; + +public abstract class AbstractHttpRequestHandler implements Handler { + private final ManagedContext currentManagedContext; + private final Handler currentManagedContextTerminationHandler; + + public AbstractHttpRequestHandler() { + this.currentManagedContext = Arc.container().requestContext(); + this.currentManagedContextTerminationHandler = e -> currentManagedContext.terminate(); + } + + @Override + @SuppressWarnings("unchecked") // ignore currentManagedContextTerminationHandler types, just use Object + public void handle(final RoutingContext ctx) { + + if (currentManagedContext.isActive()) { + doHandle(ctx); + } else { + + currentManagedContext.activate(); + ctx.response() + .endHandler(currentManagedContextTerminationHandler) + .exceptionHandler(currentManagedContextTerminationHandler) + .closeHandler(currentManagedContextTerminationHandler); + + try { + doHandle(ctx); + } catch (Throwable t) { + currentManagedContext.terminate(); + throw t; + } + } + } + + public void doHandle(RoutingContext ctx) { + try { + HttpServerRequest request = ctx.request(); + + switch (request.method().name()) { + case "OPTIONS": + handleOptions(ctx); + break; + case "POST": + handlePost(ctx); + break; + case "GET": + handleGet(ctx); + break; + default: + ctx.next(); + break; + } + } catch (Exception e) { + ctx.fail(e); + } + } + + public abstract void handlePost(RoutingContext event); + + public abstract void handleGet(RoutingContext event); + + public abstract void handleOptions(RoutingContext event); + + protected String getRequestPath(RoutingContext event) { + HttpServerRequest request = event.request(); + return request.path(); + } + + //TODO: service methods for HTTP requests +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.java new file mode 100644 index 0000000000000..174ef04aa08b8 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaTopicClient.java @@ -0,0 +1,276 @@ +package io.quarkus.kafka.client.runtime.ui; + +import static io.quarkus.kafka.client.runtime.ui.util.ConsumerFactory.createConsumer; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import java.util.stream.Collectors; + +import javax.annotation.PostConstruct; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.utils.Bytes; + +import io.quarkus.kafka.client.runtime.ui.model.Order; +import io.quarkus.kafka.client.runtime.ui.model.converter.KafkaModelConverter; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest; +import io.quarkus.kafka.client.runtime.ui.model.response.KafkaMessagePage; +import io.smallrye.common.annotation.Identifier; + +@ApplicationScoped +public class KafkaTopicClient { + // TODO: make configurable + private static final int RETRIES = 3; + + //TODO: inject me + private AdminClient adminClient; + + KafkaModelConverter modelConverter = new KafkaModelConverter(); + + @Inject + @Identifier("default-kafka-broker") + Map config; + + @PostConstruct + void init() { + Map conf = new HashMap<>(config); + conf.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); + adminClient = AdminClient.create(conf); + } + + private Producer createProducer() { + Map config = new HashMap<>(this.config); + + config.put(ProducerConfig.CLIENT_ID_CONFIG, "kafka-ui-producer-" + UUID.randomUUID()); + // TODO: make generic to support AVRO serializer + config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName()); + config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class.getName()); + + return new KafkaProducer<>(config); + } + + /** + * Reads the messages from particular topic. Offset for next page is returned within response. + * The first/last page offset could be retrieved with + * {@link KafkaTopicClient#getPagePartitionOffset(String, Collection, Order)} + * method. + * + * @param topicName topic to read messages from + * @param order ascending or descending. Defaults to descending (newest first) + * @param partitionOffsets the offset for page to be read + * @param pageSize size of read page + * @return page of messages, matching requested filters + */ + public KafkaMessagePage getTopicMessages( + String topicName, + Order order, + Map partitionOffsets, + int pageSize) + throws ExecutionException, InterruptedException { + assertParamsValid(pageSize, partitionOffsets); + + var requestedPartitions = partitionOffsets.keySet(); + assertRequestedPartitionsExist(topicName, requestedPartitions); + if (order == null) + order = Order.OLD_FIRST; + + var allPartitionsResult = getConsumerRecords(topicName, order, pageSize, requestedPartitions, partitionOffsets, + pageSize); + + Comparator> comparator = Comparator.comparing(ConsumerRecord::timestamp); + if (Order.NEW_FIRST == order) + comparator = comparator.reversed(); + allPartitionsResult.sort(comparator); + + // We might have too many values. Throw away newer items, which don't fit into page. + if (allPartitionsResult.size() > pageSize) { + allPartitionsResult = allPartitionsResult.subList(0, pageSize); + } + + var newOffsets = calculateNewPartitionOffset(partitionOffsets, allPartitionsResult, order, topicName); + var convertedResult = allPartitionsResult.stream() + .map(modelConverter::convert) + .collect(Collectors.toList()); + return new KafkaMessagePage(newOffsets, convertedResult); + } + + // Fail fast on wrong params, even before querying Kafka. + private void assertParamsValid(int pageSize, Map partitionOffsets) { + if (pageSize <= 0) + throw new IllegalArgumentException("Page size must be > 0."); + + if (partitionOffsets == null || partitionOffsets.isEmpty()) + throw new IllegalArgumentException("Partition offset map must be specified."); + + for (var partitionOffset : partitionOffsets.entrySet()) { + if (partitionOffset.getValue() < 0) + throw new IllegalArgumentException( + "Partition offset must be > 0."); + } + } + + private ConsumerRecords pollWhenReady(Consumer consumer) { + var attempts = 0; + var pullDuration = Duration.of(100, ChronoUnit.MILLIS); + var result = consumer.poll(pullDuration); + + while (result.isEmpty() && attempts < RETRIES) { + result = consumer.poll(pullDuration); + attempts++; + } + return result; + } + + /* + * FIXME: should consider compaction strategy, when our new offset not necessary = old + total records read, but some + * records might be deleted, so we'll end up seeing duplicates on some pages. + * Imagine this case: + * - page size = 10 + * - 30 messages pushed, value is incremental 1 ... 30. + * - message 10 gets removed, as message 15 has same key because of compaction + * - we request page 1. it had offset 0. we return values [1, 2, 3, ..., 9, 11], total of 10. We get new offset for page 2 = + * 0 + totalRecords = 10. + * - we request page 2. we read starting from offset = 10. There is no message with that offset, but we see message 11 again + * instead. + */ + private Map calculateNewPartitionOffset(Map oldPartitionOffset, + Collection> records, Order order, String topicName) { + var newOffsets = records.stream().map(ConsumerRecord::partition) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + + var newPartitionOffset = new HashMap(); + for (var partition : oldPartitionOffset.keySet()) { + // We should add in case we seek for oldest and reduce for newest. + var multiplier = Order.OLD_FIRST == order ? 1 : -1; + + // If new offset for partition is not there in the map - we didn't have records for that partition. So, just take the old offset. + var newOffset = oldPartitionOffset.get(partition) + multiplier * newOffsets.getOrDefault(partition, 0L); + newPartitionOffset.put(partition, newOffset); + } + return newPartitionOffset; + } + + private long getPosition(String topicName, int partition, Order order) { + try (var consumer = createConsumer(topicName, partition, this.config)) { + var topicPartition = new TopicPartition(topicName, partition); + if (Order.NEW_FIRST == order) { + consumer.seekToEnd(List.of(topicPartition)); + } else { + consumer.seekToBeginning(List.of(topicPartition)); + } + return consumer.position(topicPartition); + } + } + + public Map getPagePartitionOffset(String topicName, Collection requestedPartitions, Order order) + throws ExecutionException, InterruptedException { + assertRequestedPartitionsExist(topicName, requestedPartitions); + + var result = new HashMap(); + for (var requestedPartition : requestedPartitions) { + var maxPosition = getPosition(topicName, requestedPartition, order); + result.put(requestedPartition, maxPosition); + } + + return result; + } + + private List> getConsumerRecords(String topicName, Order order, int pageSize, + Collection requestedPartitions, Map start, int totalMessages) { + List> allPartitionsResult = new ArrayList<>(); + + // Requesting a full page from each partition and then filtering out redundant data. Thus, we'll ensure, we read data in historical order. + for (var requestedPartition : requestedPartitions) { + List> partitionResult = new ArrayList<>(); + var offset = start.get(requestedPartition); + try (var consumer = createConsumer(topicName, requestedPartition, this.config)) { + // Move pointer to currently read position. It might be different per partition, so requesting with offset per partition. + var partition = new TopicPartition(topicName, requestedPartition); + + var seekedOffset = Order.OLD_FIRST == order ? offset : Long.max(offset - pageSize, 0); + consumer.seek(partition, seekedOffset); + + var numberOfMessagesReadSoFar = 0; + var keepOnReading = true; + + while (keepOnReading) { + var records = pollWhenReady(consumer); + if (records.isEmpty()) + keepOnReading = false; + + for (var record : records) { + numberOfMessagesReadSoFar++; + partitionResult.add(record); + + if (numberOfMessagesReadSoFar >= totalMessages) { + keepOnReading = false; + break; + } + } + } + // We need to cut off result, if it was reset to 0, as we don't want see entries from old pages. + if (Order.NEW_FIRST == order && seekedOffset == 0 && partitionResult.size() > offset.intValue()) { + partitionResult.sort(Comparator.comparing(ConsumerRecord::timestamp)); + partitionResult = partitionResult.subList(0, offset.intValue()); + } + + } + allPartitionsResult.addAll(partitionResult); + } + return allPartitionsResult; + } + + private void assertRequestedPartitionsExist(String topicName, Collection requestedPartitions) + throws InterruptedException, ExecutionException { + var topicPartitions = partitions(topicName); + + if (!new HashSet<>(topicPartitions).containsAll(requestedPartitions)) { + throw new IllegalArgumentException(String.format( + "Requested messages from partition, that do not exist. Requested partitions: %s. Existing partitions: %s", + requestedPartitions, topicPartitions)); + } + } + + public void createMessage(KafkaMessageCreateRequest request) { + var record = new ProducerRecord<>(request.getTopic(), request.getPartition(), Bytes.wrap(request.getKey().getBytes()), + Bytes.wrap(request.getValue().getBytes()) + //TODO: support headers + ); + + try (var producer = createProducer()) { + producer.send(record); + } + } + + public List partitions(String topicName) throws ExecutionException, InterruptedException { + return adminClient.describeTopics(List.of(topicName)) + .allTopicNames() + .get() + .values().stream() + .reduce((a, b) -> { + throw new IllegalStateException( + "Requested info about single topic, but got result of multiple: " + a + ", " + b); + }) + .orElseThrow(() -> new IllegalStateException( + "Requested info about a topic, but nothing found. Topic name: " + topicName)) + .partitions().stream() + .map(TopicPartitionInfo::partition) + .collect(Collectors.toList()); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java new file mode 100644 index 0000000000000..8b7d916de765b --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiHandler.java @@ -0,0 +1,131 @@ + +package io.quarkus.kafka.client.runtime.ui; + +import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static io.netty.handler.codec.http.HttpResponseStatus.OK; + +import java.util.concurrent.ExecutionException; + +import io.netty.handler.codec.http.HttpResponseStatus; +import io.quarkus.arc.Arc; +import io.quarkus.kafka.client.runtime.KafkaAdminClient; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaCreateTopicRequest; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessagesRequest; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaOffsetRequest; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.ext.web.RoutingContext; + +public class KafkaUiHandler extends AbstractHttpRequestHandler { + + @Override + public void handlePost(RoutingContext event) { + if (event.body() == null) { + endResponse(event, BAD_REQUEST, "Request body is null"); + return; + } + var body = event.body().asJsonObject(); + if (body == null) { + endResponse(event, BAD_REQUEST, "Request JSON body is null"); + return; + } + var action = body.getString("action"); + + var message = "OK"; + var error = ""; + + var webUtils = kafkaWebUiUtils(); + var adminClient = kafkaAdminClient(); + + boolean res = false; + if (null != action) { + try { + switch (action) { + case "getInfo": + message = webUtils.toJson(webUtils.getKafkaInfo()); + res = true; + break; + case "getAclInfo": + message = webUtils.toJson(webUtils.getAclInfo()); + res = true; + break; + case "createTopic": + var topicCreateRq = event.body().asPojo(KafkaCreateTopicRequest.class); + res = adminClient.createTopic(topicCreateRq); + message = webUtils.toJson(webUtils.getTopics()); + break; + case "deleteTopic": + res = adminClient.deleteTopic(body.getString("key")); + message = "{}"; + res = true; + break; + case "getTopics": + message = webUtils.toJson(webUtils.getTopics()); + res = true; + break; + case "topicMessages": + var msgRequest = event.body().asPojo(KafkaMessagesRequest.class); + message = webUtils.toJson(webUtils.getMessages(msgRequest)); + res = true; + break; + case "getOffset": + var request = event.body().asPojo(KafkaOffsetRequest.class); + message = webUtils.toJson(webUtils.getOffset(request)); + res = true; + break; + case "createMessage": + var rq = event.body().asPojo(KafkaMessageCreateRequest.class); + webUtils.createMessage(rq); + message = "{}"; + res = true; + break; + case "getPartitions": + var topicName = body.getString("topicName"); + message = webUtils.toJson(webUtils.partitions(topicName)); + res = true; + break; + default: + break; + } + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } catch (ExecutionException ex) { + throw new RuntimeException(ex); + } + } + + if (res) { + endResponse(event, OK, message); + } else { + message = "ERROR: " + error; + endResponse(event, BAD_REQUEST, message); + } + } + + private void endResponse(RoutingContext event, HttpResponseStatus status, String message) { + event.response().setStatusCode(status.code()); + event.response().end(message); + } + + private KafkaUiUtils kafkaWebUiUtils() { + return Arc.container().instance(KafkaUiUtils.class).get(); + } + + @Override + public void handleGet(RoutingContext event) { + //TODO: move pure get requests processing here + HttpServerRequest request = event.request(); + String path = request.path(); + endResponse(event, OK, "GET method is not supported yet. Path is: " + path); + } + + @Override + public void handleOptions(RoutingContext event) { + endResponse(event, OK, "OPTION method is not supported yet"); + } + + private KafkaAdminClient kafkaAdminClient() { + return Arc.container().instance(KafkaAdminClient.class).get(); + } + +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiRecorder.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiRecorder.java new file mode 100644 index 0000000000000..90afe8521cc11 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiRecorder.java @@ -0,0 +1,17 @@ +package io.quarkus.kafka.client.runtime.ui; + +import io.quarkus.runtime.annotations.Recorder; +import io.vertx.core.Handler; +import io.vertx.ext.web.RoutingContext; + +/** + * Handles requests from kafka UI and html/js of UI + */ +@Recorder +public class KafkaUiRecorder { + + public Handler kafkaControlHandler() { + return new KafkaUiHandler(); + } + +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java new file mode 100644 index 0000000000000..862fdcfeb2d2d --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/KafkaUiUtils.java @@ -0,0 +1,226 @@ +package io.quarkus.kafka.client.runtime.ui; + +import static io.quarkus.kafka.client.runtime.ui.util.ConsumerFactory.createConsumer; + +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import javax.inject.Singleton; + +import org.apache.kafka.clients.admin.ConsumerGroupDescription; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.clients.admin.MemberDescription; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.quarkus.kafka.client.runtime.KafkaAdminClient; +import io.quarkus.kafka.client.runtime.ui.model.Order; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessageCreateRequest; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaMessagesRequest; +import io.quarkus.kafka.client.runtime.ui.model.request.KafkaOffsetRequest; +import io.quarkus.kafka.client.runtime.ui.model.response.*; +import io.smallrye.common.annotation.Identifier; + +@Singleton +public class KafkaUiUtils { + + private final KafkaAdminClient kafkaAdminClient; + + private final KafkaTopicClient kafkaTopicClient; + private final ObjectMapper objectMapper; + + private final Map config; + + public KafkaUiUtils(KafkaAdminClient kafkaAdminClient, KafkaTopicClient kafkaTopicClient, ObjectMapper objectMapper, + @Identifier("default-kafka-broker") Map config) { + this.kafkaAdminClient = kafkaAdminClient; + this.kafkaTopicClient = kafkaTopicClient; + this.objectMapper = objectMapper; + this.config = config; + } + + public KafkaInfo getKafkaInfo() throws ExecutionException, InterruptedException { + var clusterInfo = getClusterInfo(); + var broker = clusterInfo.getController().asFullNodeName(); + var topics = getTopics(); + var consumerGroups = getConsumerGroups(); + return new KafkaInfo(broker, clusterInfo, topics, consumerGroups); + } + + public KafkaClusterInfo getClusterInfo() throws ExecutionException, InterruptedException { + return clusterInfo(kafkaAdminClient.getCluster()); + } + + private KafkaNode kafkaNode(Node node) { + return new KafkaNode(node.host(), node.port(), node.idString()); + } + + private KafkaClusterInfo clusterInfo(DescribeClusterResult dcr) throws InterruptedException, ExecutionException { + var controller = kafkaNode(dcr.controller().get()); + var nodes = new ArrayList(); + for (var node : dcr.nodes().get()) { + nodes.add(kafkaNode(node)); + } + var aclOperations = dcr.authorizedOperations().get(); + + var aclOperationsStr = new StringBuilder(); + if (aclOperations != null) { + for (var operation : dcr.authorizedOperations().get()) { + if (aclOperationsStr.length() == 0) { + aclOperationsStr.append(", "); + } + aclOperationsStr.append(operation.name()); + } + } else { + aclOperationsStr = new StringBuilder("NONE"); + } + + return new KafkaClusterInfo( + dcr.clusterId().get(), + controller, + nodes, + aclOperationsStr.toString()); + } + + public List getTopics() throws InterruptedException, ExecutionException { + var res = new ArrayList(); + for (TopicListing tl : kafkaAdminClient.getTopics()) { + res.add(kafkaTopic(tl)); + } + return res; + } + + private KafkaTopic kafkaTopic(TopicListing tl) throws ExecutionException, InterruptedException { + var partitions = partitions(tl.name()); + return new KafkaTopic( + tl.name(), + tl.topicId().toString(), + partitions.size(), + tl.isInternal(), + getTopicMessageCount(tl.name(), partitions)); + } + + public long getTopicMessageCount(String topicName, Collection partitions) + throws ExecutionException, InterruptedException { + var maxPartitionOffsetMap = kafkaTopicClient.getPagePartitionOffset(topicName, partitions, Order.NEW_FIRST); + return maxPartitionOffsetMap.values().stream() + .reduce(Long::sum) + .orElse(0L); + } + + public Collection partitions(String topicName) throws ExecutionException, InterruptedException { + return kafkaTopicClient.partitions(topicName); + } + + public KafkaMessagePage getMessages(KafkaMessagesRequest request) throws ExecutionException, InterruptedException { + return kafkaTopicClient.getTopicMessages(request.getTopicName(), request.getOrder(), request.getPartitionOffset(), + request.getPageSize()); + } + + public void createMessage(KafkaMessageCreateRequest request) { + kafkaTopicClient.createMessage(request); + } + + public List getConsumerGroups() throws InterruptedException, ExecutionException { + List res = new ArrayList<>(); + for (ConsumerGroupDescription cgd : kafkaAdminClient.getConsumerGroups()) { + + var metadata = kafkaAdminClient.listConsumerGroupOffsets(cgd.groupId()) + .partitionsToOffsetAndMetadata().get(); + var members = cgd.members().stream() + .map(member -> new KafkaConsumerGroupMember( + member.consumerId(), + member.clientId(), + member.host(), + getPartitionAssignments(metadata, member))) + .collect(Collectors.toSet()); + + res.add(new KafkaConsumerGroup( + cgd.groupId(), + cgd.state().name(), + cgd.coordinator().host(), + cgd.coordinator().id(), + cgd.partitionAssignor(), + getTotalLag(members), + members)); + } + return res; + } + + private long getTotalLag(Set members) { + return members.stream() + .map(KafkaConsumerGroupMember::getPartitions) + .flatMap(Collection::stream) + .map(KafkaConsumerGroupMemberPartitionAssignment::getLag) + .reduce(Long::sum) + .orElse(0L); + } + + private Set getPartitionAssignments( + Map topicOffsetMap, MemberDescription member) { + var topicPartitions = member.assignment().topicPartitions(); + try (var consumer = createConsumer(topicPartitions, config)) { + var endOffsets = consumer.endOffsets(topicPartitions); + + return topicPartitions.stream() + .map(tp -> { + var topicOffset = Optional.ofNullable(topicOffsetMap.get(tp)) + .map(OffsetAndMetadata::offset) + .orElse(0L); + return new KafkaConsumerGroupMemberPartitionAssignment(tp.partition(), tp.topic(), + getLag(topicOffset, endOffsets.get(tp))); + }) + .collect(Collectors.toSet()); + } + } + + private long getLag(long topicOffset, long endOffset) { + return endOffset - topicOffset; + } + + public Map getOffset(KafkaOffsetRequest request) throws ExecutionException, InterruptedException { + return kafkaTopicClient.getPagePartitionOffset(request.getTopicName(), request.getRequestedPartitions(), + request.getOrder()); + } + + public KafkaAclInfo getAclInfo() throws InterruptedException, ExecutionException { + var clusterInfo = clusterInfo(kafkaAdminClient.getCluster()); + var entries = new ArrayList(); + //TODO: fix it after proper error message impl + try { + var acls = kafkaAdminClient.getAclInfo(); + for (var acl : acls) { + var entry = new KafkaAclEntry( + acl.entry().operation().name(), + acl.entry().principal(), + acl.entry().permissionType().name(), + acl.pattern().toString()); + entries.add(entry); + } + } catch (Exception e) { + // this mostly means that ALC controller is absent + } + return new KafkaAclInfo( + clusterInfo.getId(), + clusterInfo.getController().asFullNodeName(), + clusterInfo.getAclOperations(), + entries); + } + + public String toJson(Object o) { + String res; + try { + res = objectMapper.writeValueAsString(o); + } catch (JsonProcessingException ex) { + //FIXME: + res = ""; + } + return res; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/Order.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/Order.java new file mode 100644 index 0000000000000..a94a5565c4a0f --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/Order.java @@ -0,0 +1,6 @@ +package io.quarkus.kafka.client.runtime.ui.model; + +public enum Order { + OLD_FIRST, + NEW_FIRST +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/converter/KafkaModelConverter.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/converter/KafkaModelConverter.java new file mode 100644 index 0000000000000..5eaad0173d129 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/converter/KafkaModelConverter.java @@ -0,0 +1,20 @@ +package io.quarkus.kafka.client.runtime.ui.model.converter; + +import java.util.Optional; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.utils.Bytes; + +import io.quarkus.kafka.client.runtime.ui.model.response.KafkaMessage; + +public class KafkaModelConverter { + public KafkaMessage convert(ConsumerRecord message) { + return new KafkaMessage( + message.topic(), + message.partition(), + message.offset(), + message.timestamp(), + Optional.ofNullable(message.key()).map(Bytes::toString).orElse(null), + Optional.ofNullable(message.value()).map(Bytes::toString).orElse(null)); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaCreateTopicRequest.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaCreateTopicRequest.java new file mode 100644 index 0000000000000..8fbe12f9c2500 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaCreateTopicRequest.java @@ -0,0 +1,28 @@ +package io.quarkus.kafka.client.runtime.ui.model.request; + +public class KafkaCreateTopicRequest { + private String topicName; + private Integer partitions; + private Short replications; + + public KafkaCreateTopicRequest() { + } + + public KafkaCreateTopicRequest(String topicName, Integer partitions, Short replications) { + this.topicName = topicName; + this.partitions = partitions; + this.replications = replications; + } + + public String getTopicName() { + return topicName; + } + + public Integer getPartitions() { + return partitions; + } + + public Short getReplications() { + return replications; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessageCreateRequest.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessageCreateRequest.java new file mode 100644 index 0000000000000..5dcbebb32fdaa --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessageCreateRequest.java @@ -0,0 +1,39 @@ +package io.quarkus.kafka.client.runtime.ui.model.request; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties("action") +public class KafkaMessageCreateRequest { + + //TODO: add headers + private String topic; + private Integer partition; + private String value; + private String key; + + public KafkaMessageCreateRequest() { + } + + public KafkaMessageCreateRequest(String topic, Integer partition, String value, String key) { + this.topic = topic; + this.partition = partition; + this.value = value; + this.key = key; + } + + public String getTopic() { + return topic; + } + + public Integer getPartition() { + return partition; + } + + public String getValue() { + return value; + } + + public String getKey() { + return key; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessagesRequest.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessagesRequest.java new file mode 100644 index 0000000000000..71fda0e79d8a6 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaMessagesRequest.java @@ -0,0 +1,51 @@ +package io.quarkus.kafka.client.runtime.ui.model.request; + +import java.util.Map; + +import io.quarkus.kafka.client.runtime.ui.model.Order; + +public class KafkaMessagesRequest { + private String topicName; + private Order order; + private int pageSize; + private Integer pageNumber; + + private Map partitionOffset; + + public KafkaMessagesRequest() { + } + + public KafkaMessagesRequest(String topicName, Order order, int pageSize, int pageNumber) { + this.topicName = topicName; + this.order = order; + this.pageSize = pageSize; + this.pageNumber = pageNumber; + } + + public KafkaMessagesRequest(String topicName, Order order, int pageSize, Map partitionOffset) { + this.topicName = topicName; + this.order = order; + this.pageSize = pageSize; + this.partitionOffset = partitionOffset; + } + + public String getTopicName() { + return topicName; + } + + public Order getOrder() { + return order; + } + + public int getPageSize() { + return pageSize; + } + + public int getPageNumber() { + return pageNumber; + } + + public Map getPartitionOffset() { + return partitionOffset; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaOffsetRequest.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaOffsetRequest.java new file mode 100644 index 0000000000000..f9fa52cdb7369 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/request/KafkaOffsetRequest.java @@ -0,0 +1,32 @@ +package io.quarkus.kafka.client.runtime.ui.model.request; + +import java.util.List; + +import io.quarkus.kafka.client.runtime.ui.model.Order; + +public class KafkaOffsetRequest { + private String topicName; + private List requestedPartitions; + private Order order; + + public KafkaOffsetRequest() { + } + + public KafkaOffsetRequest(String topicName, List requestedPartitions, Order order) { + this.topicName = topicName; + this.requestedPartitions = requestedPartitions; + this.order = order; + } + + public String getTopicName() { + return topicName; + } + + public List getRequestedPartitions() { + return requestedPartitions; + } + + public Order getOrder() { + return order; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclEntry.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclEntry.java new file mode 100644 index 0000000000000..b32a0d729f6b7 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclEntry.java @@ -0,0 +1,34 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +public class KafkaAclEntry { + private String operation; + private String principal; + private String perm; + private String pattern; + + public KafkaAclEntry() { + } + + public KafkaAclEntry(String operation, String principal, String perm, String pattern) { + this.operation = operation; + this.principal = principal; + this.perm = perm; + this.pattern = pattern; + } + + public String getOperation() { + return operation; + } + + public String getPrincipal() { + return principal; + } + + public String getPerm() { + return perm; + } + + public String getPattern() { + return pattern; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclInfo.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclInfo.java new file mode 100644 index 0000000000000..4e53287f220b7 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaAclInfo.java @@ -0,0 +1,37 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.ArrayList; +import java.util.List; + +public class KafkaAclInfo { + private String clusterId; + private String broker; + private String aclOperations; + private List entries = new ArrayList<>(); + + public KafkaAclInfo() { + } + + public KafkaAclInfo(String clusterId, String broker, String aclOperations, List entries) { + this.clusterId = clusterId; + this.broker = broker; + this.aclOperations = aclOperations; + this.entries = entries; + } + + public String getClusterId() { + return clusterId; + } + + public String getBroker() { + return broker; + } + + public String getAclOperations() { + return aclOperations; + } + + public List getEntries() { + return entries; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaClusterInfo.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaClusterInfo.java new file mode 100644 index 0000000000000..71e8e67c69b11 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaClusterInfo.java @@ -0,0 +1,37 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.ArrayList; +import java.util.List; + +public class KafkaClusterInfo { + private String id; + private KafkaNode controller; + private List nodes = new ArrayList<>(); + private String aclOperations; + + public KafkaClusterInfo() { + } + + public KafkaClusterInfo(String id, KafkaNode controller, List nodes, String aclOperations) { + this.id = id; + this.controller = controller; + this.nodes = nodes; + this.aclOperations = aclOperations; + } + + public String getId() { + return id; + } + + public KafkaNode getController() { + return controller; + } + + public List getNodes() { + return nodes; + } + + public String getAclOperations() { + return aclOperations; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroup.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroup.java new file mode 100644 index 0000000000000..e6506837534eb --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroup.java @@ -0,0 +1,56 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.Collection; + +public class KafkaConsumerGroup { + private String name; + private String state; + private String coordinatorHost; + private int coordinatorId; + // The assignment strategy + private String protocol; + private long lag; + private Collection members; + + public KafkaConsumerGroup() { + } + + public KafkaConsumerGroup(String name, String state, String coordinatorHost, int coordinatorId, String protocol, long lag, + Collection members) { + this.name = name; + this.state = state; + this.coordinatorHost = coordinatorHost; + this.coordinatorId = coordinatorId; + this.protocol = protocol; + this.lag = lag; + this.members = members; + } + + public String getName() { + return name; + } + + public String getState() { + return state; + } + + public String getCoordinatorHost() { + return coordinatorHost; + } + + public int getCoordinatorId() { + return coordinatorId; + } + + public String getProtocol() { + return protocol; + } + + public long getLag() { + return lag; + } + + public Collection getMembers() { + return members; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMember.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMember.java new file mode 100644 index 0000000000000..338890414b702 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMember.java @@ -0,0 +1,38 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.Collection; + +public class KafkaConsumerGroupMember { + private String memberId; + private String clientId; + private String host; + + private Collection partitions; + + public KafkaConsumerGroupMember() { + } + + public KafkaConsumerGroupMember(String memberId, String clientId, String host, + Collection partitions) { + this.memberId = memberId; + this.clientId = clientId; + this.host = host; + this.partitions = partitions; + } + + public String getMemberId() { + return memberId; + } + + public String getClientId() { + return clientId; + } + + public String getHost() { + return host; + } + + public Collection getPartitions() { + return partitions; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMemberPartitionAssignment.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMemberPartitionAssignment.java new file mode 100644 index 0000000000000..4a722e76d6385 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaConsumerGroupMemberPartitionAssignment.java @@ -0,0 +1,29 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +public class KafkaConsumerGroupMemberPartitionAssignment { + + private int partition; + private String topic; + private long lag; + + public KafkaConsumerGroupMemberPartitionAssignment() { + } + + public KafkaConsumerGroupMemberPartitionAssignment(int partition, String topic, long lag) { + this.partition = partition; + this.topic = topic; + this.lag = lag; + } + + public int getPartition() { + return partition; + } + + public String getTopic() { + return topic; + } + + public long getLag() { + return lag; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaInfo.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaInfo.java new file mode 100644 index 0000000000000..f8a63d09638f5 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaInfo.java @@ -0,0 +1,37 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.List; + +public class KafkaInfo { + private String broker; + private KafkaClusterInfo clusterInfo; + private List topics; + private List consumerGroups; + + public KafkaInfo() { + } + + public KafkaInfo(String broker, KafkaClusterInfo clusterInfo, List topics, + List consumerGroups) { + this.broker = broker; + this.clusterInfo = clusterInfo; + this.topics = topics; + this.consumerGroups = consumerGroups; + } + + public String getBroker() { + return broker; + } + + public List getTopics() { + return topics; + } + + public KafkaClusterInfo getClusterInfo() { + return clusterInfo; + } + + public List getConsumerGroups() { + return consumerGroups; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessage.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessage.java new file mode 100644 index 0000000000000..4b4e246994a94 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessage.java @@ -0,0 +1,43 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +public class KafkaMessage { + private final String topic; + private final int partition; + private final long offset; + private final long timestamp; + private final String key; + private final String value; + + public KafkaMessage(String topic, int partition, long offset, long timestamp, String key, String value) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.timestamp = timestamp; + this.key = key; + this.value = value; + } + + public String getTopic() { + return topic; + } + + public int getPartition() { + return partition; + } + + public long getOffset() { + return offset; + } + + public long getTimestamp() { + return timestamp; + } + + public String getKey() { + return key; + } + + public String getValue() { + return value; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessagePage.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessagePage.java new file mode 100644 index 0000000000000..c57aaa6ce5178 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaMessagePage.java @@ -0,0 +1,22 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +import java.util.Collection; +import java.util.Map; + +public class KafkaMessagePage { + private final Map nextOffsets; + private final Collection messages; + + public KafkaMessagePage(Map nextOffsets, Collection messages) { + this.nextOffsets = nextOffsets; + this.messages = messages; + } + + public Map getNextOffsets() { + return nextOffsets; + } + + public Collection getMessages() { + return messages; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaNode.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaNode.java new file mode 100644 index 0000000000000..137645a7c29ee --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaNode.java @@ -0,0 +1,32 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +public class KafkaNode { + private String host; + private int port; + private String id; + + public KafkaNode() { + } + + public KafkaNode(String host, int port, String id) { + this.host = host; + this.port = port; + this.id = id; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getId() { + return id; + } + + public String asFullNodeName() { + return host + ":" + port; + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaTopic.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaTopic.java new file mode 100644 index 0000000000000..ab5595d7a8488 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/model/response/KafkaTopic.java @@ -0,0 +1,46 @@ +package io.quarkus.kafka.client.runtime.ui.model.response; + +public class KafkaTopic { + private String name; + private String topicId; + private int partitionsCount; + private boolean internal; + private long nmsg = 0; + + public KafkaTopic() { + } + + public KafkaTopic(String name, String topicId, int partitionsCount, boolean internal, long nmsg) { + this.name = name; + this.topicId = topicId; + this.partitionsCount = partitionsCount; + this.internal = internal; + this.nmsg = nmsg; + } + + public String getName() { + return name; + } + + public String getTopicId() { + return topicId; + } + + public int getPartitionsCount() { + return partitionsCount; + } + + public boolean isInternal() { + return internal; + } + + public long getNmsg() { + return nmsg; + } + + public String toString() { + StringBuilder sb = new StringBuilder(name); + sb.append(" : ").append(topicId); + return sb.toString(); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/util/ConsumerFactory.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/util/ConsumerFactory.java new file mode 100644 index 0000000000000..be2c140530860 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/runtime/ui/util/ConsumerFactory.java @@ -0,0 +1,37 @@ +package io.quarkus.kafka.client.runtime.ui.util; + +import java.util.*; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.BytesDeserializer; +import org.apache.kafka.common.utils.Bytes; + +public class ConsumerFactory { + + public static Consumer createConsumer(String topicName, Integer requestedPartition, + Map commonConfig) { + return createConsumer(List.of(new TopicPartition(topicName, requestedPartition)), commonConfig); + } + + // We must create a new instance per request, as we might have multiple windows open, each with different pagination, filter and thus different cursor. + public static Consumer createConsumer(Collection requestedPartitions, + Map commonConfig) { + Map config = new HashMap<>(commonConfig); + //TODO: make generic? + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class); + + config.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui-" + UUID.randomUUID()); + + // For pagination, we require manual management of offset pointer. + config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + + var consumer = new KafkaConsumer(config); + consumer.assign(requestedPartitions); + return consumer; + } + +} diff --git a/extensions/vertx-http/deployment/pom.xml b/extensions/vertx-http/deployment/pom.xml index f73b67dd3c39e..a5154ddd7b939 100644 --- a/extensions/vertx-http/deployment/pom.xml +++ b/extensions/vertx-http/deployment/pom.xml @@ -52,6 +52,16 @@ bootstrap provided + + org.webjars + bootstrap-multiselect + provided + + + org.webjars.npm + bootstrap-icons + provided + org.webjars font-awesome @@ -196,6 +206,55 @@ + + + org.webjars + bootstrap-multiselect + ${webjar.bootstrap-multiselect.version} + jar + true + ${project.build.directory}/classes/dev-static/js/ + **/bootstrap-multiselect.js + + + + + + org.webjars + bootstrap-multiselect + ${webjar.bootstrap-multiselect.version} + jar + true + ${project.build.directory}/classes/dev-static/css/ + **/bootstrap-multiselect.css + + + + + + org.webjars.npm + bootstrap-icons + ${webjar.bootstrap-icons.version} + jar + true + ${project.build.directory}/classes/dev-static/css/ + **/font/bootstrap-icons.css + + + + + + org.webjars.npm + bootstrap-icons + ${webjar.bootstrap-icons.version} + jar + true + ${project.build.directory}/classes/dev-static/css/fonts/ + **/font/fonts/ + + + + org.webjars diff --git a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/console/DevConsoleProcessor.java b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/console/DevConsoleProcessor.java index 0943aeda594d1..6c34e448daabe 100644 --- a/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/console/DevConsoleProcessor.java +++ b/extensions/vertx-http/deployment/src/main/java/io/quarkus/vertx/http/deployment/devmode/console/DevConsoleProcessor.java @@ -64,6 +64,7 @@ import io.quarkus.devconsole.spi.DevConsoleRouteBuildItem; import io.quarkus.devconsole.spi.DevConsoleRuntimeTemplateInfoBuildItem; import io.quarkus.devconsole.spi.DevConsoleTemplateInfoBuildItem; +import io.quarkus.devconsole.spi.DevConsoleWebjarBuildItem; import io.quarkus.maven.dependency.ArtifactKey; import io.quarkus.maven.dependency.GACT; import io.quarkus.netty.runtime.virtual.VirtualChannel; @@ -88,6 +89,7 @@ import io.quarkus.runtime.RuntimeValue; import io.quarkus.runtime.TemplateHtmlBuilder; import io.quarkus.utilities.OS; +import io.quarkus.vertx.http.deployment.BodyHandlerBuildItem; import io.quarkus.vertx.http.deployment.HttpRootPathBuildItem; import io.quarkus.vertx.http.deployment.NonApplicationRootPathBuildItem; import io.quarkus.vertx.http.deployment.RouteBuildItem; @@ -387,6 +389,46 @@ public WebJarBuildItem setupWebJar( .build(); } + @BuildStep(onlyIf = IsDevelopment.class) + public void setupDevConsoleWebjar( + List devConsoleWebjarBuildItems, + BuildProducer webJarBuildItemBuildProducer, + LaunchModeBuildItem launchModeBuildItem) { + if (launchModeBuildItem.getDevModeType().orElse(null) != DevModeType.LOCAL) { + return; + } + for (DevConsoleWebjarBuildItem devConsoleWebjar : devConsoleWebjarBuildItems) { + webJarBuildItemBuildProducer.produce(WebJarBuildItem.builder() + .artifactKey(devConsoleWebjar.getArtifactKey()) + .root(devConsoleWebjar.getRoot()) + .useDefaultQuarkusBranding(devConsoleWebjar.getUseDefaultQuarkusBranding()) + .onlyCopyNonArtifactFiles(devConsoleWebjar.getOnlyCopyNonArtifactFiles()) + .build()); + } + } + + @Record(ExecutionTime.RUNTIME_INIT) + @BuildStep(onlyIf = IsDevelopment.class) + public void setupDevConsoleRoutes( + List devConsoleWebjarBuildItems, + DevConsoleRecorder recorder, + NonApplicationRootPathBuildItem nonApplicationRootPathBuildItem, + ShutdownContextBuildItem shutdownContext, + BuildProducer routeBuildItemBuildProducer, + WebJarResultsBuildItem webJarResultsBuildItem) { + + for (DevConsoleWebjarBuildItem webjarBuildItem : devConsoleWebjarBuildItems) { + WebJarResultsBuildItem.WebJarResult result = webJarResultsBuildItem.byArtifactKey(webjarBuildItem.getArtifactKey()); + if (result == null) { + continue; + } + routeBuildItemBuildProducer.produce(nonApplicationRootPathBuildItem.routeBuilder() + .route("dev/" + webjarBuildItem.getRouteRoot() + "/*") + .handler(recorder.fileSystemStaticHandler(result.getWebRootConfigurations(), shutdownContext)) + .build()); + } + } + @BuildStep(onlyIf = { IsDevelopment.class }) public DevConsoleTemplateInfoBuildItem config(List serviceDescriptions) { return new DevConsoleTemplateInfoBuildItem("devServices", serviceDescriptions); @@ -404,7 +446,8 @@ public void setupDevConsoleRoutes( ShutdownContextBuildItem shutdownContext, BuildProducer routeBuildItemBuildProducer, WebJarResultsBuildItem webJarResultsBuildItem, - CurateOutcomeBuildItem curateOutcomeBuildItem) { + CurateOutcomeBuildItem curateOutcomeBuildItem, + BodyHandlerBuildItem bodyHandlerBuildItem) { WebJarResultsBuildItem.WebJarResult result = webJarResultsBuildItem.byArtifactKey(DEVCONSOLE_WEBJAR_ARTIFACT_KEY); @@ -432,7 +475,8 @@ public void setupDevConsoleRoutes( NonApplicationRootPathBuildItem.Builder builder = nonApplicationRootPathBuildItem.routeBuilder() .routeFunction( "dev/" + groupAndArtifact.getKey() + "." + groupAndArtifact.getValue() + "/" + i.getPath(), - new RuntimeDevConsoleRoute(i.getMethod())); + new RuntimeDevConsoleRoute(i.getMethod(), + i.isBodyHandlerRequired() ? bodyHandlerBuildItem.getHandler() : null)); if (i.isBlockingHandler()) { builder.blockingRoute(); } diff --git a/extensions/vertx-http/dev-console-spi/src/main/java/io/quarkus/devconsole/spi/DevConsoleWebjarBuildItem.java b/extensions/vertx-http/dev-console-spi/src/main/java/io/quarkus/devconsole/spi/DevConsoleWebjarBuildItem.java new file mode 100644 index 0000000000000..01d16ceea3e3c --- /dev/null +++ b/extensions/vertx-http/dev-console-spi/src/main/java/io/quarkus/devconsole/spi/DevConsoleWebjarBuildItem.java @@ -0,0 +1,105 @@ +package io.quarkus.devconsole.spi; + +import io.quarkus.builder.item.MultiBuildItem; +import io.quarkus.maven.dependency.GACT; + +public final class DevConsoleWebjarBuildItem extends MultiBuildItem { + /** + * ArtifactKey pointing to the web jar. Has to be one of the applications dependencies. + */ + private final GACT artifactKey; + + /** + * Root inside the webJar starting from which resources are unpacked. + */ + private final String root; + + /** + * Only copy resources of the webjar which are either user overridden, or contain variables. + */ + private final boolean onlyCopyNonArtifactFiles; + + /** + * Defines whether Quarkus can override resources of the webjar with Quarkus internal files. + */ + private final boolean useDefaultQuarkusBranding; + + /** + * The root of the route to expose resources of the webjar + */ + private final String routeRoot; + + private DevConsoleWebjarBuildItem(Builder builder) { + this.artifactKey = builder.artifactKey; + this.root = builder.root; + this.useDefaultQuarkusBranding = builder.useDefaultQuarkusBranding; + this.onlyCopyNonArtifactFiles = builder.onlyCopyNonArtifactFiles; + this.routeRoot = builder.routeRoot; + } + + public GACT getArtifactKey() { + return artifactKey; + } + + public String getRoot() { + return root; + } + + public boolean getUseDefaultQuarkusBranding() { + return useDefaultQuarkusBranding; + } + + public boolean getOnlyCopyNonArtifactFiles() { + return onlyCopyNonArtifactFiles; + } + + public String getRouteRoot() { + return routeRoot; + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private GACT artifactKey; + private String root; + private boolean useDefaultQuarkusBranding = true; + private boolean onlyCopyNonArtifactFiles = true; + private String routeRoot; + + public Builder artifactKey(GACT artifactKey) { + this.artifactKey = artifactKey; + return this; + } + + public Builder root(String root) { + this.root = root; + + if (this.root != null && this.root.startsWith("/")) { + this.root = this.root.substring(1); + } + + return this; + } + + public Builder routeRoot(String route) { + this.routeRoot = route; + return this; + } + + public Builder useDefaultQuarkusBranding(boolean useDefaultQuarkusBranding) { + this.useDefaultQuarkusBranding = useDefaultQuarkusBranding; + return this; + } + + public Builder onlyCopyNonArtifactFiles(boolean onlyCopyNonArtifactFiles) { + this.onlyCopyNonArtifactFiles = onlyCopyNonArtifactFiles; + return this; + } + + public DevConsoleWebjarBuildItem build() { + return new DevConsoleWebjarBuildItem(this); + } + } +} diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/RuntimeDevConsoleRoute.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/RuntimeDevConsoleRoute.java index ff5a05a705691..d3adff64c5e0a 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/RuntimeDevConsoleRoute.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/devmode/RuntimeDevConsoleRoute.java @@ -2,24 +2,37 @@ import java.util.function.Consumer; +import io.vertx.core.Handler; import io.vertx.core.http.HttpMethod; import io.vertx.ext.web.Route; +import io.vertx.ext.web.RoutingContext; public class RuntimeDevConsoleRoute implements Consumer { private String method; + private Handler bodyHandler; + public RuntimeDevConsoleRoute() { } - public RuntimeDevConsoleRoute(String method) { + public RuntimeDevConsoleRoute(String method, Handler hasBodyHandler) { this.method = method; + this.bodyHandler = hasBodyHandler; } public String getMethod() { return method; } + public Handler getBodyHandler() { + return bodyHandler; + } + + public void setBodyHandler(Handler bodyHandler) { + this.bodyHandler = bodyHandler; + } + public RuntimeDevConsoleRoute setMethod(String method) { this.method = method; return this; @@ -29,5 +42,8 @@ public RuntimeDevConsoleRoute setMethod(String method) { public void accept(Route route) { route.method(HttpMethod.valueOf(method)) .order(-100); + if (bodyHandler != null) { + route.handler(bodyHandler); + } } }