Skip to content

Commit

Permalink
Transformations: Add support for an inner join transformation (#53865)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexKaracaoglu committed Aug 18, 2022
1 parent fb40b80 commit a3c1cd8
Show file tree
Hide file tree
Showing 8 changed files with 814 additions and 205 deletions.
29 changes: 29 additions & 0 deletions docs/sources/panels/transform-data/transformation-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -568,3 +568,32 @@ Here is the result after adding a Limit transformation with a value of '3':
| 2020-07-07 11:34:20 | Temperature | 25 |
| 2020-07-07 11:34:20 | Humidity | 22 |
| 2020-07-07 10:32:20 | Humidity | 29 |

## Join by field (Inner join)

Use this transformation to combine the results from multiple queries (combining on a passed join field or the first time column) into one single result and drop rows where a successful join isn't able to occur - performing an inner join.

In the example below, we have two queries returning table data. It is visualized as two separate tables before applying the inner join transformation.

Query A:

| Time | Job | Uptime |
| ------------------- | ------- | --------- |
| 2020-07-07 11:34:20 | node | 25260122 |
| 2020-07-07 11:24:20 | postgre | 123001233 |
| 2020-07-07 11:14:20 | postgre | 345001233 |

Query B:

| Time | Server | Errors |
| ------------------- | -------- | ------ |
| 2020-07-07 11:34:20 | server 1 | 15 |
| 2020-07-07 11:24:20 | server 2 | 5 |
| 2020-07-07 11:04:20 | server 3 | 10 |

Result after applying the inner join transformation:

| Time | Job | Uptime | Server | Errors |
| ------------------- | ------- | --------- | -------- | ------ |
| 2020-07-07 11:34:20 | node | 25260122 | server 1 | 15 |
| 2020-07-07 11:24:20 | postgre | 123001233 | server 2 | 5 |
1 change: 1 addition & 0 deletions packages/grafana-data/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"d3-interpolate": "1.4.0",
"date-fns": "2.29.1",
"eventemitter3": "4.0.7",
"fast_array_intersect": "1.1.0",
"history": "4.10.1",
"lodash": "4.17.21",
"marked": "4.0.18",
Expand Down
3 changes: 2 additions & 1 deletion packages/grafana-data/src/transformations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export {
ByNamesMatcherMode,
} from './matchers/nameMatcher';
export type { RenameByRegexTransformerOptions } from './transformers/renameByRegex';
export { outerJoinDataFrames } from './transformers/joinDataFrames';
/** @deprecated -- will be removed in future versions */
export { joinDataFrames as outerJoinDataFrames } from './transformers/joinDataFrames';
export * from './transformers/histogram';
export { ensureTimeField } from './transformers/convertFieldType';
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ import { mockTransformationsRegistry } from '../../utils/tests/mockTransformatio
import { ArrayVector } from '../../vector';

import { calculateFieldTransformer } from './calculateField';
import { isLikelyAscendingVector, outerJoinDataFrames } from './joinDataFrames';
import { isLikelyAscendingVector, joinDataFrames } from './joinDataFrames';
import { JoinMode } from './seriesToColumns';

describe('align frames', () => {
beforeAll(() => {
mockTransformationsRegistry([calculateFieldTransformer]);
});

it('by first time field', () => {
describe('by first time field', () => {
const series1 = toDataFrame({
fields: [
{ name: 'TheTime', type: FieldType.time, values: [1000, 2000] },
{ name: 'A', type: FieldType.number, values: [1, 100] },
],
});

const series2 = toDataFrame({
fields: [
{ name: '_time', type: FieldType.time, values: [1000, 1500, 2000] },
Expand All @@ -28,56 +28,106 @@ describe('align frames', () => {
],
});

const out = outerJoinDataFrames({ frames: [series1, series2] })!;
expect(
out.fields.map((f) => ({
name: f.name,
values: f.values.toArray(),
}))
).toMatchInlineSnapshot(`
Array [
Object {
"name": "TheTime",
"values": Array [
1000,
1500,
2000,
],
},
Object {
"name": "A",
"values": Array [
1,
undefined,
100,
],
},
Object {
"name": "A",
"values": Array [
2,
20,
200,
],
},
Object {
"name": "B",
"values": Array [
3,
30,
300,
],
},
Object {
"name": "C",
"values": Array [
"first",
"second",
"third",
],
},
]
`);
it('should perform an outer join', () => {
const out = joinDataFrames({ frames: [series1, series2] })!;
expect(
out.fields.map((f) => ({
name: f.name,
values: f.values.toArray(),
}))
).toMatchInlineSnapshot(`
Array [
Object {
"name": "TheTime",
"values": Array [
1000,
1500,
2000,
],
},
Object {
"name": "A",
"values": Array [
1,
undefined,
100,
],
},
Object {
"name": "A",
"values": Array [
2,
20,
200,
],
},
Object {
"name": "B",
"values": Array [
3,
30,
300,
],
},
Object {
"name": "C",
"values": Array [
"first",
"second",
"third",
],
},
]
`);
});

it('should perform an inner join', () => {
const out = joinDataFrames({ frames: [series1, series2], mode: JoinMode.inner })!;
expect(
out.fields.map((f) => ({
name: f.name,
values: f.values.toArray(),
}))
).toMatchInlineSnapshot(`
Array [
Object {
"name": "TheTime",
"values": Array [
1000,
2000,
],
},
Object {
"name": "A",
"values": Array [
1,
100,
],
},
Object {
"name": "A",
"values": Array [
2,
200,
],
},
Object {
"name": "B",
"values": Array [
3,
300,
],
},
Object {
"name": "C",
"values": Array [
"first",
"third",
],
},
]
`);
});
});

it('unsorted input keep indexes', () => {
Expand All @@ -96,7 +146,7 @@ describe('align frames', () => {
],
});

let out = outerJoinDataFrames({ frames: [series1, series3], keepOriginIndices: true })!;
let out = joinDataFrames({ frames: [series1, series3], keepOriginIndices: true })!;
expect(
out.fields.map((f) => ({
name: f.name,
Expand Down Expand Up @@ -151,7 +201,7 @@ describe('align frames', () => {
`);

// Fast path still adds origin indecies
out = outerJoinDataFrames({ frames: [series1], keepOriginIndices: true })!;
out = joinDataFrames({ frames: [series1], keepOriginIndices: true })!;
expect(
out.fields.map((f) => ({
name: f.name,
Expand Down Expand Up @@ -189,7 +239,7 @@ describe('align frames', () => {
],
});

const out = outerJoinDataFrames({ frames: [series1], keepOriginIndices: true })!;
const out = joinDataFrames({ frames: [series1], keepOriginIndices: true })!;
expect(
out.fields.map((f) => ({
name: f.name,
Expand Down Expand Up @@ -236,7 +286,7 @@ describe('align frames', () => {
],
});

const out = outerJoinDataFrames({ frames: [series1, series3] })!;
const out = joinDataFrames({ frames: [series1, series3] })!;
expect(
out.fields.map((f) => ({
name: f.name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import intersect from 'fast_array_intersect';

import { getTimeField, sortDataFrame } from '../../dataframe';
import { DataFrame, Field, FieldMatcher, FieldType, Vector } from '../../types';
import { ArrayVector } from '../../vector';
import { fieldMatchers } from '../matchers';
import { FieldMatcherID } from '../matchers/ids';

import { JoinMode } from './seriesToColumns';

export function pickBestJoinField(data: DataFrame[]): FieldMatcher {
const { timeField } = getTimeField(data[0]);
if (timeField) {
Expand Down Expand Up @@ -52,6 +56,11 @@ export interface JoinOptions {
* @internal -- used when we need to keep a reference to the original frame/field index
*/
keepOriginIndices?: boolean;

/**
* @internal -- Optionally specify a join mode (outer or inner)
*/
mode?: JoinMode;
}

function getJoinMatcher(options: JoinOptions): FieldMatcher {
Expand All @@ -77,7 +86,7 @@ export function maybeSortFrame(frame: DataFrame, fieldIdx: number) {
* This will return a single frame joined by the first matching field. When a join field is not specified,
* the default will use the first time field
*/
export function outerJoinDataFrames(options: JoinOptions): DataFrame | undefined {
export function joinDataFrames(options: JoinOptions): DataFrame | undefined {
if (!options.frames?.length) {
return;
}
Expand Down Expand Up @@ -211,7 +220,7 @@ export function outerJoinDataFrames(options: JoinOptions): DataFrame | undefined
allData.push(a);
}

const joined = join(allData, nullModes);
const joined = join(allData, nullModes, options.mode);

return {
// ...options.data[0], // keep name, meta?
Expand Down Expand Up @@ -272,16 +281,23 @@ function nullExpand(yVals: Array<number | null>, nullIdxs: number[], alignedLen:
}

// nullModes is a tables-matched array indicating how to treat nulls in each series
export function join(tables: AlignedData[], nullModes?: number[][]) {
const xVals = new Set<number>();

for (let ti = 0; ti < tables.length; ti++) {
let t = tables[ti];
let xs = t[0];
let len = xs.length;

for (let i = 0; i < len; i++) {
xVals.add(xs[i]);
export function join(tables: AlignedData[], nullModes?: number[][], mode: JoinMode = JoinMode.outer) {
let xVals: Set<number>;

if (mode === JoinMode.inner) {
// @ts-ignore
xVals = new Set(intersect(tables.map((t) => t[0])));
} else {
xVals = new Set();

for (let ti = 0; ti < tables.length; ti++) {
let t = tables[ti];
let xs = t[0];
let len = xs.length;

for (let i = 0; i < len; i++) {
xVals.add(xs[i]);
}
}
}

Expand Down

0 comments on commit a3c1cd8

Please sign in to comment.