From c527c218b93f83bc47b7cae60f8d037e72b9c6c5 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Tue, 27 Apr 2021 01:12:40 +0300 Subject: [PATCH 01/21] fix #5767 issue with DataFrame Merge method --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 17 ++++++++--------- .../DataFrameTests.cs | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index d5a1278371..381268dee2 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -252,9 +252,9 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ // Hash the column with the smaller RowCount long leftRowCount = Rows.Count; long rightRowCount = other.Rows.Count; - DataFrame longerDataFrame = leftRowCount <= rightRowCount ? other : this; - DataFrame shorterDataFrame = ReferenceEquals(longerDataFrame, this) ? other : this; - DataFrameColumn hashColumn = (leftRowCount <= rightRowCount) ? Columns[leftJoinColumn] : other.Columns[rightJoinColumn]; + + var leftColumnIsSmaller = (leftRowCount <= rightRowCount); + DataFrameColumn hashColumn = leftColumnIsSmaller ? Columns[leftJoinColumn] : other.Columns[rightJoinColumn]; DataFrameColumn otherColumn = ReferenceEquals(hashColumn, Columns[leftJoinColumn]) ? other.Columns[rightJoinColumn] : Columns[leftJoinColumn]; Dictionary> multimap = hashColumn.GroupColumnValues(); @@ -270,23 +270,21 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ { if (hashColumn[row] == null) { - leftRowIndices.Append(row); - rightRowIndices.Append(i); + leftRowIndices.Append(leftColumnIsSmaller ? row : i); + rightRowIndices.Append(leftColumnIsSmaller ? i : row); } } else { if (hashColumn[row] != null) { - leftRowIndices.Append(row); - rightRowIndices.Append(i); + leftRowIndices.Append(leftColumnIsSmaller ? row : i); + rightRowIndices.Append(leftColumnIsSmaller ? i : row); } } } } } - leftDataFrame = shorterDataFrame; - rightDataFrame = longerDataFrame; } else if (joinAlgorithm == JoinAlgorithm.FullOuter) { @@ -366,4 +364,5 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ } } + } diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs index 300babbffb..72072fd533 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs @@ -1579,6 +1579,25 @@ public void TestSample() Assert.Throws(()=> df.Sample(13)); } + [Theory] + [InlineData(1, 2)] + [InlineData(2, 1)] + public void TestDataCorrectnessForInnerMerge(int leftCount, int rightCount) + { + DataFrame left = MakeDataFrameWithNumericColumns(leftCount, false); + DataFrameColumn leftStringColumn = new StringDataFrameColumn("String", Enumerable.Range(0, leftCount).Select(x => "Left")); + left.Columns.Insert(left.Columns.Count, leftStringColumn); + + DataFrame right = MakeDataFrameWithNumericColumns(rightCount, false); + DataFrameColumn rightStringColumn = new StringDataFrameColumn("String", Enumerable.Range(0, rightCount).Select(x => "Right")); + right.Columns.Insert(right.Columns.Count, rightStringColumn); + + DataFrame merge = left.Merge(right, "Int", "Int", joinAlgorithm: JoinAlgorithm.Inner); + + Assert.Equal("Left", (string)merge.Columns["String_left"][0]); + Assert.Equal("Right", (string)merge.Columns["String_right"][0]); + } + [Fact] public void TestMerge() { From 05807104b471661aba732af88599372423f7bdb5 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Fri, 28 May 2021 22:54:36 +0300 Subject: [PATCH 02/21] #5820 Extend DataFrame GroupBy operations --- src/Microsoft.Data.Analysis/DataFrame.cs | 22 ++++ src/Microsoft.Data.Analysis/GroupBy.cs | 11 +- src/Microsoft.Data.Analysis/Grouping.cs | 32 ++++++ src/Microsoft.Data.Analysis/Strings.resx | 3 + .../DataFrameGroupByTests.cs | 107 ++++++++++++++++++ 5 files changed, 174 insertions(+), 1 deletion(-) create mode 100644 src/Microsoft.Data.Analysis/Grouping.cs create mode 100644 test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs diff --git a/src/Microsoft.Data.Analysis/DataFrame.cs b/src/Microsoft.Data.Analysis/DataFrame.cs index 8eb04797aa..ea3ded0f83 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.cs @@ -367,6 +367,28 @@ public GroupBy GroupBy(string columnName) DataFrameColumn column = _columnCollection[columnIndex]; return column.GroupBy(columnIndex, this); } + + /// + /// Groups the rows of the by unique values in the column. + /// + /// Type of column used for grouping + /// The column used to group unique values + /// A GroupBy object that stores the group information. + public GroupBy GroupBy(string columnName) + { + int columnIndex = _columnCollection.IndexOf(columnName); + if (columnIndex == -1) + throw new ArgumentException(String.Format(Strings.InvalidColumnName, columnName), nameof(columnName)); + + DataFrameColumn column = _columnCollection[columnIndex]; + + var group = column.GroupBy(columnIndex, this) as GroupBy; + + if (group == null) + throw new InvalidCastException(String.Format(Strings.BadColumnCast, columnName, column.DataType, typeof(TKey))); + + return group; + } // In GroupBy and ReadCsv calls, columns get resized. We need to set the RowCount to reflect the true Length of the DataFrame. This does internal validation internal void SetTableRowCount(long rowCount) diff --git a/src/Microsoft.Data.Analysis/GroupBy.cs b/src/Microsoft.Data.Analysis/GroupBy.cs index 5d8013e9b6..acbdd3cbd7 100644 --- a/src/Microsoft.Data.Analysis/GroupBy.cs +++ b/src/Microsoft.Data.Analysis/GroupBy.cs @@ -3,7 +3,9 @@ // See the LICENSE file in the project root for more information. using System; +using System.Collections; using System.Collections.Generic; +using System.Linq; namespace Microsoft.Data.Analysis { @@ -70,7 +72,7 @@ public abstract class GroupBy public abstract DataFrame Mean(params string[] columnNames); } - public class GroupBy : GroupBy + public class GroupBy : GroupBy { private int _groupByColumnIndex; private IDictionary> _keyToRowIndicesMap; @@ -464,5 +466,12 @@ public override DataFrame Mean(params string[] columnNames) return ret; } + public IEnumerable> Groupings + { + get + { + return _keyToRowIndicesMap.Select(kvp => new Grouping(kvp.Key, kvp.Value.Select(index => _dataFrame.Rows[index]).ToArray())); + } + } } } diff --git a/src/Microsoft.Data.Analysis/Grouping.cs b/src/Microsoft.Data.Analysis/Grouping.cs new file mode 100644 index 0000000000..fcbd037568 --- /dev/null +++ b/src/Microsoft.Data.Analysis/Grouping.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Microsoft.Data.Analysis +{ + public class Grouping : IGrouping + { + private readonly TKey _key; + private readonly ICollection _rows; + + public Grouping(TKey key, ICollection rows) + { + _key = key; + _rows = rows; + } + + public TKey Key => _key; + + public IEnumerator GetEnumerator() + { + return _rows.GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return _rows.GetEnumerator(); + } + } +} diff --git a/src/Microsoft.Data.Analysis/Strings.resx b/src/Microsoft.Data.Analysis/Strings.resx index de91078cec..db6dfb4984 100644 --- a/src/Microsoft.Data.Analysis/Strings.resx +++ b/src/Microsoft.Data.Analysis/Strings.resx @@ -120,6 +120,9 @@ Cannot cast column holding {0} values to type {1} + + Cannot cast elements of column '{0}' type of {1} to type {2} used as TKey in grouping + Line {0} cannot be parsed with the current Delimiters. diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs new file mode 100644 index 0000000000..91325338bb --- /dev/null +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs @@ -0,0 +1,107 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Xunit; + +namespace Microsoft.Data.Analysis.Tests +{ + public class DataFrameGroupByTests + { + [Fact] + public void TestGroupingWithTKeyTypeofString() + { + int lenght = 11; + + //Create test dataframe (numbers starting from 0 up to lenght) + DataFrame df = MakeTestDataFrame(lenght); + + var grouping = df.GroupBy("Parity").Groupings; + + //Check groups count + Assert.Equal(2, grouping.Count()); + + //Check number of elements in each group + var oddGroup = grouping.Where(gr => gr.Key == "odd").FirstOrDefault(); + Assert.NotNull(oddGroup); + Assert.Equal(lenght/2, oddGroup.Count()); + + var evenGroup = grouping.Where(gr => gr.Key == "even").FirstOrDefault(); + Assert.NotNull(evenGroup); + Assert.Equal(lenght / 2 + lenght % 2, evenGroup.Count()); + + //Check corner cases + lenght = 0; + df = MakeTestDataFrame(lenght); + grouping = df.GroupBy("Parity").Groupings; + Assert.Empty(grouping); + + lenght = 1; + df = MakeTestDataFrame(lenght); + grouping = df.GroupBy("Parity").Groupings; + Assert.Single(grouping); + Assert.Equal("even", grouping.First().Key); + } + + [Fact] + public void TestGroupingWithTKeyPrimitiveType() + { + const int lenght = 55; + + //Create test dataframe (numbers starting from 0 up to lenght) + DataFrame df = MakeTestDataFrame(lenght); + + //Group elements by int column, that contain the amount of full tens in each int + var groupings = df.GroupBy("Tens").Groupings.ToDictionary(g => g.Key, g => g.ToList()); + + //Get the amount of all number based columns + int numberColumnsCount = df.Columns.Count - 2; //except "Parity" and "Tens" columns + + //Check each group + for (int i = 0; i < lenght / 10; i++) + { + Assert.Equal(10, groupings[i].Count()); + + var rows = groupings[i]; + for (int colIndex = 0; colIndex < numberColumnsCount; colIndex++) + { + var values = rows.Select(row => Convert.ToInt32(row[colIndex])); + + for (int j = 0; j < 10; j++) + { + Assert.Contains(i * 10 + j, values); + } + } + } + + //Last group should contain smaller amount of items + Assert.Equal(lenght % 10, groupings[lenght / 10].Count()); + } + + [Fact] + public void TestGroupingWithTKeyOfWrongType() + { + const int lenght = 5; + + var message = string.Empty; + + //Create test dataframe (numbers starting from 0 up to lenght) + DataFrame df = MakeTestDataFrame(lenght); + + //Use wrong type for grouping + Assert.Throws(() => df.GroupBy("Tens")); + } + + + private DataFrame MakeTestDataFrame(int length) + { + DataFrame df = DataFrameTests.MakeDataFrameWithNumericColumns(length, false); + DataFrameColumn parityColumn = new StringDataFrameColumn("Parity", Enumerable.Range(0, length).Select(x => x % 2 == 0 ? "even" : "odd")); + DataFrameColumn tensColumn = new Int32DataFrameColumn("Tens", Enumerable.Range(0, length).Select(x => x / 10)); + df.Columns.Insert(df.Columns.Count, parityColumn); + df.Columns.Insert(df.Columns.Count, tensColumn); + + return df; + } + } +} From f7658b2d65576f07853da870d03c121cbb099471 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Sat, 29 May 2021 12:58:21 +0300 Subject: [PATCH 03/21] #5820 fix code review findings --- src/Microsoft.Data.Analysis/GroupBy.cs | 33 +++++++++++++++-- src/Microsoft.Data.Analysis/Grouping.cs | 32 ---------------- .../DataFrameGroupByTests.cs | 37 ++++++++++++------- 3 files changed, 53 insertions(+), 49 deletions(-) delete mode 100644 src/Microsoft.Data.Analysis/Grouping.cs diff --git a/src/Microsoft.Data.Analysis/GroupBy.cs b/src/Microsoft.Data.Analysis/GroupBy.cs index acbdd3cbd7..4defc4ec3a 100644 --- a/src/Microsoft.Data.Analysis/GroupBy.cs +++ b/src/Microsoft.Data.Analysis/GroupBy.cs @@ -72,8 +72,35 @@ public abstract class GroupBy public abstract DataFrame Mean(params string[] columnNames); } - public class GroupBy : GroupBy + public class GroupBy : GroupBy { + #region Internal class that implements IGrouping LINQ interface + internal class Grouping : IGrouping + { + private readonly TKey _key; + private readonly IEnumerable _rows; + + public Grouping(TKey key, IEnumerable rows) + { + _key = key; + _rows = rows; + } + + public TKey Key => _key; + + public IEnumerator GetEnumerator() + { + return _rows.GetEnumerator(); + } + + IEnumerator IEnumerable.GetEnumerator() + { + return _rows.GetEnumerator(); + } + } + + #endregion + private int _groupByColumnIndex; private IDictionary> _keyToRowIndicesMap; private DataFrame _dataFrame; @@ -468,9 +495,9 @@ public override DataFrame Mean(params string[] columnNames) public IEnumerable> Groupings { - get + get { - return _keyToRowIndicesMap.Select(kvp => new Grouping(kvp.Key, kvp.Value.Select(index => _dataFrame.Rows[index]).ToArray())); + return _keyToRowIndicesMap.Select(kvp => new Grouping(kvp.Key, kvp.Value.Select(index => _dataFrame.Rows[index]))); } } } diff --git a/src/Microsoft.Data.Analysis/Grouping.cs b/src/Microsoft.Data.Analysis/Grouping.cs deleted file mode 100644 index fcbd037568..0000000000 --- a/src/Microsoft.Data.Analysis/Grouping.cs +++ /dev/null @@ -1,32 +0,0 @@ -using System; -using System.Collections; -using System.Collections.Generic; -using System.Linq; -using System.Text; - -namespace Microsoft.Data.Analysis -{ - public class Grouping : IGrouping - { - private readonly TKey _key; - private readonly ICollection _rows; - - public Grouping(TKey key, ICollection rows) - { - _key = key; - _rows = rows; - } - - public TKey Key => _key; - - public IEnumerator GetEnumerator() - { - return _rows.GetEnumerator(); - } - - IEnumerator IEnumerable.GetEnumerator() - { - return _rows.GetEnumerator(); - } - } -} diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs index 91325338bb..57bac29d6e 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs @@ -1,4 +1,8 @@ -using System; +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; using System.Collections.Generic; using System.Linq; using System.Text; @@ -11,10 +15,10 @@ public class DataFrameGroupByTests [Fact] public void TestGroupingWithTKeyTypeofString() { - int lenght = 11; + const int lenght = 11; //Create test dataframe (numbers starting from 0 up to lenght) - DataFrame df = MakeTestDataFrame(lenght); + DataFrame df = MakeTestDataFrameWithParityAndTensColumns(lenght); var grouping = df.GroupBy("Parity").Groupings; @@ -30,26 +34,32 @@ public void TestGroupingWithTKeyTypeofString() Assert.NotNull(evenGroup); Assert.Equal(lenght / 2 + lenght % 2, evenGroup.Count()); + + } + + [Fact] + public void TestGroupingWithTKey_CornerCases() + { //Check corner cases - lenght = 0; - df = MakeTestDataFrame(lenght); - grouping = df.GroupBy("Parity").Groupings; + var df = MakeTestDataFrameWithParityAndTensColumns(0); + var grouping = df.GroupBy("Parity").Groupings; Assert.Empty(grouping); - lenght = 1; - df = MakeTestDataFrame(lenght); + + df = MakeTestDataFrameWithParityAndTensColumns(1); grouping = df.GroupBy("Parity").Groupings; Assert.Single(grouping); Assert.Equal("even", grouping.First().Key); } - + + [Fact] public void TestGroupingWithTKeyPrimitiveType() { const int lenght = 55; //Create test dataframe (numbers starting from 0 up to lenght) - DataFrame df = MakeTestDataFrame(lenght); + DataFrame df = MakeTestDataFrameWithParityAndTensColumns(lenght); //Group elements by int column, that contain the amount of full tens in each int var groupings = df.GroupBy("Tens").Groupings.ToDictionary(g => g.Key, g => g.ToList()); @@ -80,20 +90,19 @@ public void TestGroupingWithTKeyPrimitiveType() [Fact] public void TestGroupingWithTKeyOfWrongType() - { - const int lenght = 5; + { var message = string.Empty; //Create test dataframe (numbers starting from 0 up to lenght) - DataFrame df = MakeTestDataFrame(lenght); + DataFrame df = MakeTestDataFrameWithParityAndTensColumns(1); //Use wrong type for grouping Assert.Throws(() => df.GroupBy("Tens")); } - private DataFrame MakeTestDataFrame(int length) + private DataFrame MakeTestDataFrameWithParityAndTensColumns(int length) { DataFrame df = DataFrameTests.MakeDataFrameWithNumericColumns(length, false); DataFrameColumn parityColumn = new StringDataFrameColumn("Parity", Enumerable.Range(0, length).Select(x => x % 2 == 0 ? "even" : "odd")); From dc8daf0ffbcf2a26757daee988c2ddbeebf1e646 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Wed, 2 Jun 2021 16:49:25 +0300 Subject: [PATCH 04/21] Avoid code duplication in Merge DataFrame method (#5657) --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 284 +++++++----------- .../DataFrameTests.cs | 46 +-- 2 files changed, 137 insertions(+), 193 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index da99e6254f..7a0d58bd4a 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -142,220 +142,162 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right } // TODO: Merge API with an "On" parameter that merges on a column common to 2 dataframes - - /// - /// Merge DataFrames with a database style join - /// - /// - /// - /// - /// - /// - /// - /// - public DataFrame Merge(DataFrame other, string leftJoinColumn, string rightJoinColumn, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) + + private static Dictionary Merge(DataFrame retainedDataFrame, DataFrame supplementaryDataFame, string retainedJoinColumnName, string supplemetaryJoinColumnName, out PrimitiveDataFrameColumn retainedRowIndices, out PrimitiveDataFrameColumn supplementaryRowIndices, bool isInner = false, bool calculateIntersection = false) { - // A simple hash join - DataFrame ret = new DataFrame(); - DataFrame leftDataFrame = this; - DataFrame rightDataFrame = other; + Dictionary intersection = calculateIntersection ? new Dictionary(EqualityComparer.Default) : null; - // The final table size is not known until runtime - long rowNumber = 0; - PrimitiveDataFrameColumn leftRowIndices = new PrimitiveDataFrameColumn("LeftIndices"); - PrimitiveDataFrameColumn rightRowIndices = new PrimitiveDataFrameColumn("RightIndices"); - if (joinAlgorithm == JoinAlgorithm.Left) - { - // First hash other dataframe on the rightJoinColumn - DataFrameColumn otherColumn = other.Columns[rightJoinColumn]; - Dictionary> multimap = otherColumn.GroupColumnValues(out HashSet otherColumnNullIndices); + retainedRowIndices = new PrimitiveDataFrameColumn("RetainedIndices"); + supplementaryRowIndices = new PrimitiveDataFrameColumn("SupplementaryIndices"); + + // First hash supplementary dataframe + DataFrameColumn supplementaryColumn = supplementaryDataFame.Columns[supplemetaryJoinColumnName]; + Dictionary> multimap = supplementaryColumn.GroupColumnValues(out HashSet supplementaryColumnNullIndices); - // Go over the records in this dataframe and match with the dictionary - DataFrameColumn thisColumn = Columns[leftJoinColumn]; + // Go over the records in this dataframe and match with the dictionary + DataFrameColumn retainedColumn = retainedDataFrame.Columns[retainedJoinColumnName]; - for (long i = 0; i < thisColumn.Length; i++) + for (long i = 0; i < retainedColumn.Length; i++) + { + var retainedValue = retainedColumn[i]; + if (retainedValue != null) { - var thisColumnValue = thisColumn[i]; - if (thisColumnValue != null) + //Get all rows from supplementary dataframe that sutisfy JOIN condition + if (multimap.TryGetValue((TKey)retainedValue, out ICollection rowIndices)) { - if (multimap.TryGetValue((TKey)thisColumnValue, out ICollection rowNumbers)) + foreach (long rowIndex in rowIndices) { - foreach (long row in rowNumbers) + retainedRowIndices.Append(i); + supplementaryRowIndices.Append(rowIndex); + + //store intersection if required + if (calculateIntersection) { - leftRowIndices.Append(i); - rightRowIndices.Append(row); + if (!intersection.ContainsKey((TKey)retainedValue)) + { + intersection.Add((TKey)retainedValue, rowIndex); + } } } - else - { - leftRowIndices.Append(i); - rightRowIndices.Append(null); - } } else { - foreach (long row in otherColumnNullIndices) - { - leftRowIndices.Append(i); - rightRowIndices.Append(row); - } + if (isInner) + continue; + + retainedRowIndices.Append(i); + supplementaryRowIndices.Append(null); } } - } - else if (joinAlgorithm == JoinAlgorithm.Right) - { - DataFrameColumn thisColumn = Columns[leftJoinColumn]; - Dictionary> multimap = thisColumn.GroupColumnValues(out HashSet thisColumnNullIndices); - - DataFrameColumn otherColumn = other.Columns[rightJoinColumn]; - for (long i = 0; i < otherColumn.Length; i++) + else { - var otherColumnValue = otherColumn[i]; - if (otherColumnValue != null) - { - if (multimap.TryGetValue((TKey)otherColumnValue, out ICollection rowNumbers)) - { - foreach (long row in rowNumbers) - { - leftRowIndices.Append(row); - rightRowIndices.Append(i); - } - } - else - { - leftRowIndices.Append(null); - rightRowIndices.Append(i); - } - } - else + foreach (long row in supplementaryColumnNullIndices) { - foreach (long thisColumnNullIndex in thisColumnNullIndices) - { - leftRowIndices.Append(thisColumnNullIndex); - rightRowIndices.Append(i); - } + retainedRowIndices.Append(i); + supplementaryRowIndices.Append(row); } } } + + return intersection; + } + + + /// + /// Merge DataFrames with a database style join + /// + /// + /// + /// + /// + /// + /// + /// + public DataFrame Merge(DataFrame other, string leftJoinColumn, string rightJoinColumn, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) + { + //In Outer join the joined dataframe retains each row — even if no other matching row exists in supplementary dataframe. + //Outer joins subdivide further into left outer joins (left dataframe is retained), right outer joins (rightdataframe is retained), in full outer both are retained + + PrimitiveDataFrameColumn retainedRowIndices; + PrimitiveDataFrameColumn supplementaryRowIndices; + DataFrame supplementaryDataFrame; + DataFrame retainedDataFrame; + bool isLeftDataFrameRetained; + + if (joinAlgorithm == JoinAlgorithm.Left || joinAlgorithm == JoinAlgorithm.Right) + { + isLeftDataFrameRetained = (joinAlgorithm == JoinAlgorithm.Left); + + supplementaryDataFrame = isLeftDataFrameRetained ? other : this; + var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; + + retainedDataFrame = isLeftDataFrameRetained ? this : other; + var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + + Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices); + + } else if (joinAlgorithm == JoinAlgorithm.Inner) { - // Hash the column with the smaller RowCount - long leftRowCount = Rows.Count; - long rightRowCount = other.Rows.Count; + // use as supplementary (for Hashing) the dataframe with the smaller RowCount + isLeftDataFrameRetained = (Rows.Count > other.Rows.Count); - bool leftColumnIsSmaller = leftRowCount <= rightRowCount; - DataFrameColumn hashColumn = leftColumnIsSmaller ? Columns[leftJoinColumn] : other.Columns[rightJoinColumn]; - DataFrameColumn otherColumn = ReferenceEquals(hashColumn, Columns[leftJoinColumn]) ? other.Columns[rightJoinColumn] : Columns[leftJoinColumn]; - Dictionary> multimap = hashColumn.GroupColumnValues(out HashSet smallerDataFrameColumnNullIndices); + supplementaryDataFrame = isLeftDataFrameRetained ? other : this; + var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; - for (long i = 0; i < otherColumn.Length; i++) - { - var otherColumnValue = otherColumn[i]; - if (otherColumnValue != null) - { - if (multimap.TryGetValue((TKey)otherColumnValue, out ICollection rowNumbers)) - { - foreach (long row in rowNumbers) - { - leftRowIndices.Append(leftColumnIsSmaller ? row : i); - rightRowIndices.Append(leftColumnIsSmaller ? i : row); - } - } - } - else - { - foreach (long nullIndex in smallerDataFrameColumnNullIndices) - { - leftRowIndices.Append(leftColumnIsSmaller ? nullIndex : i); - rightRowIndices.Append(leftColumnIsSmaller ? i : nullIndex); - } - } - } + retainedDataFrame = isLeftDataFrameRetained ? this : other; + var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + + Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices, true); } else if (joinAlgorithm == JoinAlgorithm.FullOuter) { - DataFrameColumn otherColumn = other.Columns[rightJoinColumn]; - Dictionary> multimap = otherColumn.GroupColumnValues(out HashSet otherColumnNullIndices); - Dictionary intersection = new Dictionary(EqualityComparer.Default); + //In full outer join we would like to retain data from both side, so we do it into 2 steps: one first we do LEFT JOIN and then add lost data from the RIGHT side + + //Step 1 + //Do LEFT JOIN + isLeftDataFrameRetained = true; - // Go over the records in this dataframe and match with the dictionary - DataFrameColumn thisColumn = Columns[leftJoinColumn]; - Int64DataFrameColumn thisColumnNullIndices = new Int64DataFrameColumn("ThisColumnNullIndices"); + supplementaryDataFrame = isLeftDataFrameRetained ? other : this; + var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; - for (long i = 0; i < thisColumn.Length; i++) - { - var thisColumnValue = thisColumn[i]; - if (thisColumnValue != null) - { - if (multimap.TryGetValue((TKey)thisColumnValue, out ICollection rowNumbers)) - { - foreach (long row in rowNumbers) - { - leftRowIndices.Append(i); - rightRowIndices.Append(row); - if (!intersection.ContainsKey((TKey)thisColumnValue)) - { - intersection.Add((TKey)thisColumnValue, rowNumber); - } - } - } - else - { - leftRowIndices.Append(i); - rightRowIndices.Append(null); - } - } - else - { - thisColumnNullIndices.Append(i); - } - } - for (long i = 0; i < otherColumn.Length; i++) + retainedDataFrame = isLeftDataFrameRetained ? this : other; + var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + + var intersection = Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices, calculateIntersection: true); + + //Step 2 + //Do RIGHT JOIN to retain all data from supplementary DataFrame too (take into account data intersection from the first step to avoid duplicates) + DataFrameColumn supplementaryColumn = supplementaryDataFrame.Columns[supplementaryJoinColumn]; + + for (long i = 0; i < supplementaryColumn.Length; i++) { - var value = otherColumn[i]; + var value = supplementaryColumn[i]; if (value != null) { if (!intersection.ContainsKey((TKey)value)) { - leftRowIndices.Append(null); - rightRowIndices.Append(i); + retainedRowIndices.Append(null); + supplementaryRowIndices.Append(i); } } } - - // Now handle the null rows - foreach (long? thisColumnNullIndex in thisColumnNullIndices) - { - foreach (long otherColumnNullIndex in otherColumnNullIndices) - { - leftRowIndices.Append(thisColumnNullIndex.Value); - rightRowIndices.Append(otherColumnNullIndex); - } - if (otherColumnNullIndices.Count == 0) - { - leftRowIndices.Append(thisColumnNullIndex.Value); - rightRowIndices.Append(null); - } - } - if (thisColumnNullIndices.Length == 0) - { - foreach (long otherColumnNullIndex in otherColumnNullIndices) - { - leftRowIndices.Append(null); - rightRowIndices.Append(otherColumnNullIndex); - } - } } else throw new NotImplementedException(nameof(joinAlgorithm)); - - for (int i = 0; i < leftDataFrame.Columns.Count; i++) + + DataFrame ret = new DataFrame(); + + //insert columns from left dataframe (this) + for (int i = 0; i < this.Columns.Count; i++) { - ret.Columns.Insert(i, leftDataFrame.Columns[i].Clone(leftRowIndices)); + ret.Columns.Insert(i, this.Columns[i].Clone(isLeftDataFrameRetained ? retainedRowIndices : supplementaryRowIndices)); } - for (int i = 0; i < rightDataFrame.Columns.Count; i++) + + //insert columns from right dataframe (other) + for (int i = 0; i < other.Columns.Count; i++) { - DataFrameColumn column = rightDataFrame.Columns[i].Clone(rightRowIndices); + DataFrameColumn column = other.Columns[i].Clone(isLeftDataFrameRetained ? supplementaryRowIndices : retainedRowIndices); SetSuffixForDuplicatedColumnNames(ret, column, leftSuffix, rightSuffix); ret.Columns.Insert(ret.Columns.Count, column); } diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs index 34b676062a..6180d42b8e 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs @@ -1830,37 +1830,39 @@ public void TestMergeEdgeCases_Outer() DataFrame left = MakeDataFrameWithAllMutableColumnTypes(5); left["Int"][3] = null; DataFrame right = MakeDataFrameWithAllMutableColumnTypes(5); + right["Int"][1] = 5; + right["Int"][3] = null; + right["Int"][4] = 6; + // Creates this case: /* - * Left: Right: - * 0 0 - * 1 5 - * null(2) null(7) - * null(3) null(8) - * 4 6 + * Left: Right: RowIndex: + * 0 0 0 + * 1 5 1 + * null null 2 + * null(3) null(3) 3 + * 4 6 4 */ + /* * Merge will result in a DataFrame like: - * Int_Left Int_Right - * 0 0 - * 1 null - * 4 null - * null 5 - * null 6 - * null(2) null(7) - * null(2) null(8) - * null(3) null(7) - * null(3) null(8) + * Int_left: Int_right: Merged: Index: + * 0 0 0 - 0 0 + * 1 null 1 - N 1 + * null null 2 - 2 2 + * null null(3) 2 - 3 3 + * null(3) null 3 - 2 4 + * null(3) null(3) 3 - 3 5 + * 4 null 4 - N 6 + * null 5 N - 1 7 + * null 6 N - 4 8 */ - right["Int"][1] = 5; - right["Int"][3] = null; - right["Int"][4] = 6; DataFrame merge = left.Merge(right, "Int", "Int", joinAlgorithm: JoinAlgorithm.FullOuter); Assert.Equal(9, merge.Rows.Count); Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); - int[] mergeRows = new int[] { 0, 5, 6, 7, 8 }; + int[] mergeRows = new int[] { 0, 2, 3, 4, 5 }; int[] leftRows = new int[] { 0, 2, 2, 3, 3 }; int[] rightRows = new int[] { 0, 2, 3, 2, 3 }; for (long i = 0; i < mergeRows.Length; i++) @@ -1871,7 +1873,7 @@ public void TestMergeEdgeCases_Outer() MatchRowsOnMergedDataFrame(merge, left, right, rowIndex, leftRowIndex, rightRowIndex); } - mergeRows = new int[] { 1, 2 }; + mergeRows = new int[] { 1, 6 }; leftRows = new int[] { 1, 4 }; for (long i = 0; i < mergeRows.Length; i++) { @@ -1880,7 +1882,7 @@ public void TestMergeEdgeCases_Outer() MatchRowsOnMergedDataFrame(merge, left, right, rowIndex, leftRowIndex, null); } - mergeRows = new int[] { 3, 4 }; + mergeRows = new int[] { 7, 8 }; rightRows = new int[] { 1, 4 }; for (long i = 0; i < mergeRows.Length; i++) { From a4735ce82a7860d5c355472b88fc0b05cd54d50b Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Fri, 4 Jun 2021 14:21:20 +0300 Subject: [PATCH 05/21] Add non generic DataFrame Merge method (#5657) --- .../ArrowStringDataFrameColumn.cs | 5 +++ src/Microsoft.Data.Analysis/DataFrame.Join.cs | 17 +++++---- .../DataFrameColumn.cs | 38 +++++++++++++++++++ .../PrimitiveDataFrameColumn.cs | 5 +++ .../StringDataFrameColumn.cs | 5 +++ 5 files changed, 63 insertions(+), 7 deletions(-) diff --git a/src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs b/src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs index d05912ca6f..5065009422 100644 --- a/src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs +++ b/src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs @@ -660,5 +660,10 @@ public ArrowStringDataFrameColumn Apply(Func func) } return ret; } + + public override Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices) + { + return GetGroupedOccurrences(other, out otherColumnNullIndices); + } } } diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 7a0d58bd4a..75d645f4ff 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -149,21 +149,21 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right retainedRowIndices = new PrimitiveDataFrameColumn("RetainedIndices"); supplementaryRowIndices = new PrimitiveDataFrameColumn("SupplementaryIndices"); + - // First hash supplementary dataframe - DataFrameColumn supplementaryColumn = supplementaryDataFame.Columns[supplemetaryJoinColumnName]; - Dictionary> multimap = supplementaryColumn.GroupColumnValues(out HashSet supplementaryColumnNullIndices); - - // Go over the records in this dataframe and match with the dictionary + // Get occurrences of values in join column of the retained dataframe in join column of the supplementary dataframe DataFrameColumn retainedColumn = retainedDataFrame.Columns[retainedJoinColumnName]; + DataFrameColumn supplementaryColumn = supplementaryDataFame.Columns[supplemetaryJoinColumnName]; + var occurrences = retainedColumn.GetGroupedOccurrences(supplementaryColumn, out HashSet supplementaryColumnNullIndices); + for (long i = 0; i < retainedColumn.Length; i++) { var retainedValue = retainedColumn[i]; if (retainedValue != null) { - //Get all rows from supplementary dataframe that sutisfy JOIN condition - if (multimap.TryGetValue((TKey)retainedValue, out ICollection rowIndices)) + //Get all row indexes from supplementary dataframe that sutisfy JOIN condition + if (occurrences.TryGetValue(i, out ICollection rowIndices)) { foreach (long rowIndex in rowIndices) { @@ -215,6 +215,9 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right /// public DataFrame Merge(DataFrame other, string leftJoinColumn, string rightJoinColumn, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) { + if (other == null) + throw new ArgumentNullException(nameof(other)); + //In Outer join the joined dataframe retains each row — even if no other matching row exists in supplementary dataframe. //Outer joins subdivide further into left outer joins (left dataframe is retained), right outer joins (rightdataframe is retained), in full outer both are retained diff --git a/src/Microsoft.Data.Analysis/DataFrameColumn.cs b/src/Microsoft.Data.Analysis/DataFrameColumn.cs index ac959ba290..d6296e96f1 100644 --- a/src/Microsoft.Data.Analysis/DataFrameColumn.cs +++ b/src/Microsoft.Data.Analysis/DataFrameColumn.cs @@ -210,6 +210,44 @@ public virtual DataFrameColumn Sort(bool ascending = true) /// A mapping of value() to the indices containing this value public virtual Dictionary> GroupColumnValues(out HashSet nullIndices) => throw new NotImplementedException(); + /// + /// Get occurences of each value from this column in other column, grouped by this value + /// + /// + /// + /// A mapping of index from this column to the indices of same value in other column + public abstract Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices); + + /// + /// Get occurences of each value from this column in other column, grouped by this value + /// + /// + /// + /// + /// A mapping of index from this column to the indices of same value in other column + protected Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices) + { + if (this.DataType != other.DataType) + throw new ArgumentException(); //TODO provide appropriate exception text + + // First hash other column + Dictionary> multimap = other.GroupColumnValues(out otherColumnNullIndices); + + var ret = new Dictionary>(); + + //For each value in this column find rows from other column with equal value + for (int i = 0; i < this.Length; i++) + { + var value = this[i]; + if (value != null && multimap.TryGetValue((TKey)value, out ICollection otherRowIndices)) + { + ret.Add(i, otherRowIndices); + } + } + + return ret; + } + /// /// Returns a DataFrame containing counts of unique values /// diff --git a/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs b/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs index 0c586c8cd7..8b44bca18f 100644 --- a/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs +++ b/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs @@ -810,5 +810,10 @@ protected internal override Delegate GetValueGetterUsingCursor(DataViewRowCursor { return cursor.GetGetter(schemaColumn); } + + public override Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices) + { + return GetGroupedOccurrences(other, out otherColumnNullIndices); + } } } diff --git a/src/Microsoft.Data.Analysis/StringDataFrameColumn.cs b/src/Microsoft.Data.Analysis/StringDataFrameColumn.cs index 4faea0ee58..59e8792081 100644 --- a/src/Microsoft.Data.Analysis/StringDataFrameColumn.cs +++ b/src/Microsoft.Data.Analysis/StringDataFrameColumn.cs @@ -511,5 +511,10 @@ protected internal override Delegate GetValueGetterUsingCursor(DataViewRowCursor { return cursor.GetGetter>(schemaColumn); } + + public override Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices) + { + return GetGroupedOccurrences(other, out otherColumnNullIndices); + } } } From 7893bfd7a75e14948a5b715ffca946e7b2634071 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Fri, 4 Jun 2021 22:17:08 +0300 Subject: [PATCH 06/21] Add support for multi columns join in DataFrame (#5657) --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 128 +++++++++++++----- .../DataFrameTests.cs | 126 ++++++++++++++++- 2 files changed, 220 insertions(+), 34 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 75d645f4ff..26df600832 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; +using System.Linq; namespace Microsoft.Data.Analysis { @@ -140,42 +141,96 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right } return ret; } - - // TODO: Merge API with an "On" parameter that merges on a column common to 2 dataframes - - private static Dictionary Merge(DataFrame retainedDataFrame, DataFrame supplementaryDataFame, string retainedJoinColumnName, string supplemetaryJoinColumnName, out PrimitiveDataFrameColumn retainedRowIndices, out PrimitiveDataFrameColumn supplementaryRowIndices, bool isInner = false, bool calculateIntersection = false) + + private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supplementaryDataFrame, string[] retainedJoinColumnNames, string[] supplemetaryJoinColumnNames, out PrimitiveDataFrameColumn retainedRowIndices, out PrimitiveDataFrameColumn supplementaryRowIndices, bool isInner = false, bool calculateIntersection = false) { - Dictionary intersection = calculateIntersection ? new Dictionary(EqualityComparer.Default) : null; + if (retainedJoinColumnNames == null) + throw new ArgumentNullException(nameof(retainedJoinColumnNames)); - retainedRowIndices = new PrimitiveDataFrameColumn("RetainedIndices"); - supplementaryRowIndices = new PrimitiveDataFrameColumn("SupplementaryIndices"); - + if (supplemetaryJoinColumnNames == null) + throw new ArgumentNullException(nameof(supplemetaryJoinColumnNames)); - // Get occurrences of values in join column of the retained dataframe in join column of the supplementary dataframe - DataFrameColumn retainedColumn = retainedDataFrame.Columns[retainedJoinColumnName]; + if (retainedJoinColumnNames.Length != supplemetaryJoinColumnNames.Length) + throw new ArgumentException("", nameof(retainedJoinColumnNames)); //TODO provide correct message for the exception + - DataFrameColumn supplementaryColumn = supplementaryDataFame.Columns[supplemetaryJoinColumnName]; - var occurrences = retainedColumn.GetGroupedOccurrences(supplementaryColumn, out HashSet supplementaryColumnNullIndices); + HashSet intersection = calculateIntersection ? new HashSet() : null; + + Dictionary> occurrences = null; - for (long i = 0; i < retainedColumn.Length; i++) + // Get occurrences of values in columns used for join in the retained and supplementary dataframes + Dictionary retainedIndicesReverseMapping = null; + Dictionary> rowOccurrences = null; + + for (int colNameIndex = 0; colNameIndex < retainedJoinColumnNames.Length; colNameIndex++) { - var retainedValue = retainedColumn[i]; - if (retainedValue != null) + DataFrameColumn shrinkedRetainedColumn = retainedDataFrame.Columns[retainedJoinColumnNames[colNameIndex]]; + + //shrink retained column by row occurrences from previouse step + if (rowOccurrences != null) + { + var shrinkedRetainedIndices = rowOccurrences.Keys.ToArray(); + + var newRetainedIndicesReverseMapping = new Dictionary(); + for (int i = 0; i < shrinkedRetainedIndices.Length; i++) + { + //store reverse mapping to restore original dataframe indices from indices in shrinked row + newRetainedIndicesReverseMapping.Add(i, retainedIndicesReverseMapping != null ? retainedIndicesReverseMapping[shrinkedRetainedIndices[i]] : shrinkedRetainedIndices[i] ); + } + + retainedIndicesReverseMapping = newRetainedIndicesReverseMapping; + shrinkedRetainedColumn = shrinkedRetainedColumn.Clone(new Int64DataFrameColumn("Indices", shrinkedRetainedIndices)); + } + + DataFrameColumn supplementaryColumn = supplementaryDataFrame.Columns[supplemetaryJoinColumnNames[colNameIndex]]; + + var newOccurrences = shrinkedRetainedColumn.GetGroupedOccurrences(supplementaryColumn, out HashSet supplementaryColumnNullIndices); + + // shrink join result on current column by previouse join columns (if any) + if (rowOccurrences != null) + { + var shrinkedOccurences = new Dictionary>(); + + foreach (var kvp in newOccurrences) + { + var newValue = kvp.Value.Where(i => rowOccurrences[retainedIndicesReverseMapping[kvp.Key]].Contains(i)).ToArray(); + if (newValue.Any()) + { + shrinkedOccurences.Add(kvp.Key, newValue); + } + } + + newOccurrences = shrinkedOccurences; + } + + rowOccurrences = newOccurrences; + } + + //Restore occurences + occurrences = rowOccurrences.ToDictionary(kvp => retainedIndicesReverseMapping[kvp.Key], kvp => kvp.Value); + + retainedRowIndices = new Int64DataFrameColumn("RetainedIndices"); + supplementaryRowIndices = new Int64DataFrameColumn("SupplementaryIndices"); + + for (long i = 0; i < retainedDataFrame.Columns.RowCount; i++) + { + //retainedColumn[i]; //TODO check for null all joined columns + if (true) { //Get all row indexes from supplementary dataframe that sutisfy JOIN condition if (occurrences.TryGetValue(i, out ICollection rowIndices)) { - foreach (long rowIndex in rowIndices) + foreach (long supplementaryRowIndex in rowIndices) { retainedRowIndices.Append(i); - supplementaryRowIndices.Append(rowIndex); + supplementaryRowIndices.Append(supplementaryRowIndex); //store intersection if required if (calculateIntersection) { - if (!intersection.ContainsKey((TKey)retainedValue)) + if (!intersection.Contains(supplementaryRowIndex)) { - intersection.Add((TKey)retainedValue, rowIndex); + intersection.Add(supplementaryRowIndex); } } } @@ -191,20 +246,21 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right } else { + /* foreach (long row in supplementaryColumnNullIndices) { retainedRowIndices.Append(i); supplementaryRowIndices.Append(row); } + */ } } - + return intersection; } - /// - /// Merge DataFrames with a database style join + /// Merge DataFrames with a database style join (for backward compatibility) /// /// /// @@ -214,6 +270,12 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right /// /// public DataFrame Merge(DataFrame other, string leftJoinColumn, string rightJoinColumn, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) + { + return Merge(other, new[] { leftJoinColumn }, new[] { rightJoinColumn }, leftSuffix, rightSuffix, joinAlgorithm); + } + + + public DataFrame Merge(DataFrame other, string[] leftJoinColumns, string[] rightJoinColumns, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) { if (other == null) throw new ArgumentNullException(nameof(other)); @@ -232,12 +294,12 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ isLeftDataFrameRetained = (joinAlgorithm == JoinAlgorithm.Left); supplementaryDataFrame = isLeftDataFrameRetained ? other : this; - var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; + var supplementaryJoinColumns = isLeftDataFrameRetained ? rightJoinColumns : leftJoinColumns; retainedDataFrame = isLeftDataFrameRetained ? this : other; - var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + var retainedJoinColumns = isLeftDataFrameRetained ? leftJoinColumns : rightJoinColumns; - Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices); + Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumns, supplementaryJoinColumns, out retainedRowIndices, out supplementaryRowIndices); } else if (joinAlgorithm == JoinAlgorithm.Inner) @@ -246,12 +308,12 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ isLeftDataFrameRetained = (Rows.Count > other.Rows.Count); supplementaryDataFrame = isLeftDataFrameRetained ? other : this; - var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; + var supplementaryJoinColumns = isLeftDataFrameRetained ? rightJoinColumns : leftJoinColumns; retainedDataFrame = isLeftDataFrameRetained ? this : other; - var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + var retainedJoinColumns = isLeftDataFrameRetained ? leftJoinColumns : rightJoinColumns; - Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices, true); + Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumns, supplementaryJoinColumns, out retainedRowIndices, out supplementaryRowIndices, true); } else if (joinAlgorithm == JoinAlgorithm.FullOuter) { @@ -262,13 +324,14 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ isLeftDataFrameRetained = true; supplementaryDataFrame = isLeftDataFrameRetained ? other : this; - var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; + var supplementaryJoinColumns = isLeftDataFrameRetained ? rightJoinColumns : leftJoinColumns; retainedDataFrame = isLeftDataFrameRetained ? this : other; - var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + var retainedJoinColumns = isLeftDataFrameRetained ? leftJoinColumns : rightJoinColumns; - var intersection = Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices, calculateIntersection: true); + var intersection = Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumns, supplementaryJoinColumns, out retainedRowIndices, out supplementaryRowIndices, calculateIntersection: true); + /* //Step 2 //Do RIGHT JOIN to retain all data from supplementary DataFrame too (take into account data intersection from the first step to avoid duplicates) DataFrameColumn supplementaryColumn = supplementaryDataFrame.Columns[supplementaryJoinColumn]; @@ -278,13 +341,14 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ var value = supplementaryColumn[i]; if (value != null) { - if (!intersection.ContainsKey((TKey)value)) + if (!intersection.Contains(i)) { retainedRowIndices.Append(null); supplementaryRowIndices.Append(i); } } } + */ } else throw new NotImplementedException(nameof(joinAlgorithm)); diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs index 6180d42b8e..920d7e9bcd 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs @@ -1705,7 +1705,7 @@ public void TestMerge() Assert.Equal(merge.Columns["Int_right"][2], right.Columns["Int"][2]); VerifyMerge(merge, left, right, JoinAlgorithm.Inner); } - + private void MatchRowsOnMergedDataFrame(DataFrame merge, DataFrame left, DataFrame right, long mergeRow, long? leftRow, long? rightRow) { Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); @@ -1775,6 +1775,7 @@ public void TestMergeEdgeCases_LeftOrRight(int leftLength, int rightLength, Join } } + [Fact] public void TestMergeEdgeCases_Inner() { @@ -1892,6 +1893,126 @@ public void TestMergeEdgeCases_Outer() } } + [Fact] + public void TestMerge_ByTwoJoinColumns_Complex_LeftJoin() + { + //Test left merge by to int type columns + var leftDataFrame = new DataFrame(); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2, 3, 4, 5 })); + leftDataFrame.Columns.Add (new Int32DataFrameColumn("G1", new[] { 0, 1, 1, 2, 2, 3 })); + leftDataFrame.Columns.Add (new Int32DataFrameColumn("G2", new[] { 3, 1, 2, 1, 2, 1})); + + var rightDataFrame = new DataFrame(); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2, 3 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 1, 2 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 2, 1, 1 })); + + // Creates this case: + /* ------------------------- + * Left | Right + * I G1 G2 | I G1 G2 + * ------------------------- + * 0 0 3 | 0 1 1 + * 1 1 1 | 1 1 2 + * 2 1 2 | 2 1 1 + * 3 2 1 | 3 2 1 + * 4 2 2 + * 5 3 1 + */ + + /* + * Merge will result in a DataFrame like: + * IL G1 G2 IR + * ------------------------- + * 0 0 3 + * 1 1 1 0 1 1 + * 1 1 1 2 1 1 + * 2 1 2 1 1 2 + * 3 2 1 3 2 1 + * 4 2 2 + * 5 3 1 + */ + + var merge = leftDataFrame.Merge(rightDataFrame, new[] { "G1", "G2" }, new[] { "G1", "G2" }); + + } + + [Fact] + public void TestMerge_ByTwoColumns_Simple_ManyToMany_LeftJoin() + { + //Test left merge by to int type columns + var leftDataFrame = new DataFrame(); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 3 })); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 3 })); + + var rightDataFrame = new DataFrame(); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1 })); + + // Creates this case: + /* --------------------------- + * Left | Right + * I G1 G2 | I G1 G2 + * --------------------------- + * 0 1 1 | 0 1 1 + * 1 1 1 | 1 1 1 + * 2 3 3 + */ + + /* + * Merge will result in a DataFrame like: + * IL G1 G2 IR + * ------------------------- + * 0 1 1 0 1 1 + * 0 1 1 1 1 1 + * 1 1 1 0 1 1 + * 1 1 1 1 1 1 + * 2 3 3 + */ + + var merge = leftDataFrame.Merge(rightDataFrame, new[] { "G1", "G2" }, new[] { "G1", "G2" }); + } + + [Fact] + public void TestMerge_ByThreeColumns_OneToOne_LeftJoin() + { + //Test left merge by to int type columns + var leftDataFrame = new DataFrame(); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 2 })); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 2, 1 })); + leftDataFrame.Columns.Add(new StringDataFrameColumn("G3", new[] { "A", "B", "C" })); + + var rightDataFrame = new DataFrame(); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 0, 1, 1 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 2 })); + rightDataFrame.Columns.Add(new StringDataFrameColumn("G3", new[] { "Z", "Y", "B" })); + + // Creates this case: + /* ----------------------------- + * Left | Right + * I G1 G2 G3 | I G1 G2 G3 + * ------------------------------ + * 0 1 1 A | 0 0 1 Z + * 1 1 2 B | 1 1 1 Y + * 2 2 1 C | 2 1 2 B + */ + + /* + * Merge will result in a DataFrame like: + * IL G1 G2 G3 IR + * ------------------------- + * 0 1 1 A + * 1 1 2 B 2 1 2 B + * 2 2 1 C + */ + + var merge = leftDataFrame.Merge(rightDataFrame, new[] { "G1", "G2", "G3" }, new[] { "G1", "G2", "G3" }); + } + [Fact] public void TestMerge_Issue5778() { @@ -2116,7 +2237,8 @@ public void TestClone(int dfLength, int intDfLength) } } } - + + [Fact] public void TestColumnCreationFromExisitingColumn() { From 28fcda32cdfb2cfcb5fa05a9913bebc2486a6ea8 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Sat, 5 Jun 2021 17:56:46 +0300 Subject: [PATCH 07/21] Fix failing tests for inner, left and right joins with nulls --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 26df600832..16840c8b4a 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -141,6 +141,16 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right } return ret; } + + private static bool IsAnyNullValueInColumns (IReadOnlyCollection columns, long index) + { + foreach (var column in columns) + { + if (column[index] == null) + return true; + } + return false; + } private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supplementaryDataFrame, string[] retainedJoinColumnNames, string[] supplemetaryJoinColumnNames, out PrimitiveDataFrameColumn retainedRowIndices, out PrimitiveDataFrameColumn supplementaryRowIndices, bool isInner = false, bool calculateIntersection = false) { @@ -162,6 +172,8 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple Dictionary retainedIndicesReverseMapping = null; Dictionary> rowOccurrences = null; + HashSet supplementaryJoinColumnsNullIndices = new HashSet(); + for (int colNameIndex = 0; colNameIndex < retainedJoinColumnNames.Length; colNameIndex++) { DataFrameColumn shrinkedRetainedColumn = retainedDataFrame.Columns[retainedJoinColumnNames[colNameIndex]]; @@ -186,6 +198,8 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple var newOccurrences = shrinkedRetainedColumn.GetGroupedOccurrences(supplementaryColumn, out HashSet supplementaryColumnNullIndices); + supplementaryJoinColumnsNullIndices.UnionWith(supplementaryColumnNullIndices); + // shrink join result on current column by previouse join columns (if any) if (rowOccurrences != null) { @@ -199,23 +213,22 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple shrinkedOccurences.Add(kvp.Key, newValue); } } - newOccurrences = shrinkedOccurences; } - rowOccurrences = newOccurrences; } //Restore occurences - occurrences = rowOccurrences.ToDictionary(kvp => retainedIndicesReverseMapping[kvp.Key], kvp => kvp.Value); + occurrences = rowOccurrences.ToDictionary(kvp => retainedIndicesReverseMapping == null ? kvp.Key : retainedIndicesReverseMapping[kvp.Key], kvp => kvp.Value); retainedRowIndices = new Int64DataFrameColumn("RetainedIndices"); supplementaryRowIndices = new Int64DataFrameColumn("SupplementaryIndices"); - + + //Perform Merging + var retainJoinColumns = retainedJoinColumnNames.Select(name => retainedDataFrame.Columns[name]).ToArray(); for (long i = 0; i < retainedDataFrame.Columns.RowCount; i++) { - //retainedColumn[i]; //TODO check for null all joined columns - if (true) + if (!IsAnyNullValueInColumns(retainJoinColumns, i)) { //Get all row indexes from supplementary dataframe that sutisfy JOIN condition if (occurrences.TryGetValue(i, out ICollection rowIndices)) @@ -245,14 +258,13 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple } } else - { - /* - foreach (long row in supplementaryColumnNullIndices) + { + foreach (long row in supplementaryJoinColumnsNullIndices) { retainedRowIndices.Append(i); supplementaryRowIndices.Append(row); } - */ + } } From 5da5a373e82cc8b2abaab312b8ce8909fd80aed8 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Mon, 7 Jun 2021 12:09:41 +0300 Subject: [PATCH 08/21] #5657 fix DataFrame outer join failing tests --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 16840c8b4a..4beb324a88 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -264,7 +264,6 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple retainedRowIndices.Append(i); supplementaryRowIndices.Append(row); } - } } @@ -342,16 +341,14 @@ public DataFrame Merge(DataFrame other, string[] leftJoinColumns, string[] right var retainedJoinColumns = isLeftDataFrameRetained ? leftJoinColumns : rightJoinColumns; var intersection = Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumns, supplementaryJoinColumns, out retainedRowIndices, out supplementaryRowIndices, calculateIntersection: true); - - /* + //Step 2 //Do RIGHT JOIN to retain all data from supplementary DataFrame too (take into account data intersection from the first step to avoid duplicates) - DataFrameColumn supplementaryColumn = supplementaryDataFrame.Columns[supplementaryJoinColumn]; - - for (long i = 0; i < supplementaryColumn.Length; i++) + + for (long i = 0; i < supplementaryDataFrame.Columns.RowCount; i++) { - var value = supplementaryColumn[i]; - if (value != null) + var columns = supplementaryJoinColumns.Select(name => supplementaryDataFrame.Columns[name]).ToArray(); + if (!IsAnyNullValueInColumns(columns, i)) { if (!intersection.Contains(i)) { @@ -360,7 +357,6 @@ public DataFrame Merge(DataFrame other, string[] leftJoinColumns, string[] right } } } - */ } else throw new NotImplementedException(nameof(joinAlgorithm)); From 040f6bb330e715e8ac5ea0ff37e6a6476dced95c Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Sat, 29 May 2021 12:58:21 +0300 Subject: [PATCH 09/21] rebase to the latest main --- test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs index fdbc859f7b..e718373f45 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs @@ -33,12 +33,11 @@ public void TestGroupingWithTKeyTypeofString() var evenGroup = grouping.Where(gr => gr.Key == "even").FirstOrDefault(); Assert.NotNull(evenGroup); Assert.Equal(length / 2 + length % 2, evenGroup.Count()); - - } [Fact] public void TestGroupingWithTKey_CornerCases() + { { //Check corner cases var df = MakeTestDataFrameWithParityAndTensColumns(0); From c1b796966370ee35f5ed08fc10f3dc9a9896a52d Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Wed, 2 Jun 2021 16:49:25 +0300 Subject: [PATCH 10/21] Avoid code duplication in Merge DataFrame method (#5657) --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 284 +++++++----------- .../DataFrameTests.cs | 46 +-- 2 files changed, 137 insertions(+), 193 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index da99e6254f..7a0d58bd4a 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -142,220 +142,162 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right } // TODO: Merge API with an "On" parameter that merges on a column common to 2 dataframes - - /// - /// Merge DataFrames with a database style join - /// - /// - /// - /// - /// - /// - /// - /// - public DataFrame Merge(DataFrame other, string leftJoinColumn, string rightJoinColumn, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) + + private static Dictionary Merge(DataFrame retainedDataFrame, DataFrame supplementaryDataFame, string retainedJoinColumnName, string supplemetaryJoinColumnName, out PrimitiveDataFrameColumn retainedRowIndices, out PrimitiveDataFrameColumn supplementaryRowIndices, bool isInner = false, bool calculateIntersection = false) { - // A simple hash join - DataFrame ret = new DataFrame(); - DataFrame leftDataFrame = this; - DataFrame rightDataFrame = other; + Dictionary intersection = calculateIntersection ? new Dictionary(EqualityComparer.Default) : null; - // The final table size is not known until runtime - long rowNumber = 0; - PrimitiveDataFrameColumn leftRowIndices = new PrimitiveDataFrameColumn("LeftIndices"); - PrimitiveDataFrameColumn rightRowIndices = new PrimitiveDataFrameColumn("RightIndices"); - if (joinAlgorithm == JoinAlgorithm.Left) - { - // First hash other dataframe on the rightJoinColumn - DataFrameColumn otherColumn = other.Columns[rightJoinColumn]; - Dictionary> multimap = otherColumn.GroupColumnValues(out HashSet otherColumnNullIndices); + retainedRowIndices = new PrimitiveDataFrameColumn("RetainedIndices"); + supplementaryRowIndices = new PrimitiveDataFrameColumn("SupplementaryIndices"); + + // First hash supplementary dataframe + DataFrameColumn supplementaryColumn = supplementaryDataFame.Columns[supplemetaryJoinColumnName]; + Dictionary> multimap = supplementaryColumn.GroupColumnValues(out HashSet supplementaryColumnNullIndices); - // Go over the records in this dataframe and match with the dictionary - DataFrameColumn thisColumn = Columns[leftJoinColumn]; + // Go over the records in this dataframe and match with the dictionary + DataFrameColumn retainedColumn = retainedDataFrame.Columns[retainedJoinColumnName]; - for (long i = 0; i < thisColumn.Length; i++) + for (long i = 0; i < retainedColumn.Length; i++) + { + var retainedValue = retainedColumn[i]; + if (retainedValue != null) { - var thisColumnValue = thisColumn[i]; - if (thisColumnValue != null) + //Get all rows from supplementary dataframe that sutisfy JOIN condition + if (multimap.TryGetValue((TKey)retainedValue, out ICollection rowIndices)) { - if (multimap.TryGetValue((TKey)thisColumnValue, out ICollection rowNumbers)) + foreach (long rowIndex in rowIndices) { - foreach (long row in rowNumbers) + retainedRowIndices.Append(i); + supplementaryRowIndices.Append(rowIndex); + + //store intersection if required + if (calculateIntersection) { - leftRowIndices.Append(i); - rightRowIndices.Append(row); + if (!intersection.ContainsKey((TKey)retainedValue)) + { + intersection.Add((TKey)retainedValue, rowIndex); + } } } - else - { - leftRowIndices.Append(i); - rightRowIndices.Append(null); - } } else { - foreach (long row in otherColumnNullIndices) - { - leftRowIndices.Append(i); - rightRowIndices.Append(row); - } + if (isInner) + continue; + + retainedRowIndices.Append(i); + supplementaryRowIndices.Append(null); } } - } - else if (joinAlgorithm == JoinAlgorithm.Right) - { - DataFrameColumn thisColumn = Columns[leftJoinColumn]; - Dictionary> multimap = thisColumn.GroupColumnValues(out HashSet thisColumnNullIndices); - - DataFrameColumn otherColumn = other.Columns[rightJoinColumn]; - for (long i = 0; i < otherColumn.Length; i++) + else { - var otherColumnValue = otherColumn[i]; - if (otherColumnValue != null) - { - if (multimap.TryGetValue((TKey)otherColumnValue, out ICollection rowNumbers)) - { - foreach (long row in rowNumbers) - { - leftRowIndices.Append(row); - rightRowIndices.Append(i); - } - } - else - { - leftRowIndices.Append(null); - rightRowIndices.Append(i); - } - } - else + foreach (long row in supplementaryColumnNullIndices) { - foreach (long thisColumnNullIndex in thisColumnNullIndices) - { - leftRowIndices.Append(thisColumnNullIndex); - rightRowIndices.Append(i); - } + retainedRowIndices.Append(i); + supplementaryRowIndices.Append(row); } } } + + return intersection; + } + + + /// + /// Merge DataFrames with a database style join + /// + /// + /// + /// + /// + /// + /// + /// + public DataFrame Merge(DataFrame other, string leftJoinColumn, string rightJoinColumn, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) + { + //In Outer join the joined dataframe retains each row — even if no other matching row exists in supplementary dataframe. + //Outer joins subdivide further into left outer joins (left dataframe is retained), right outer joins (rightdataframe is retained), in full outer both are retained + + PrimitiveDataFrameColumn retainedRowIndices; + PrimitiveDataFrameColumn supplementaryRowIndices; + DataFrame supplementaryDataFrame; + DataFrame retainedDataFrame; + bool isLeftDataFrameRetained; + + if (joinAlgorithm == JoinAlgorithm.Left || joinAlgorithm == JoinAlgorithm.Right) + { + isLeftDataFrameRetained = (joinAlgorithm == JoinAlgorithm.Left); + + supplementaryDataFrame = isLeftDataFrameRetained ? other : this; + var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; + + retainedDataFrame = isLeftDataFrameRetained ? this : other; + var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + + Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices); + + } else if (joinAlgorithm == JoinAlgorithm.Inner) { - // Hash the column with the smaller RowCount - long leftRowCount = Rows.Count; - long rightRowCount = other.Rows.Count; + // use as supplementary (for Hashing) the dataframe with the smaller RowCount + isLeftDataFrameRetained = (Rows.Count > other.Rows.Count); - bool leftColumnIsSmaller = leftRowCount <= rightRowCount; - DataFrameColumn hashColumn = leftColumnIsSmaller ? Columns[leftJoinColumn] : other.Columns[rightJoinColumn]; - DataFrameColumn otherColumn = ReferenceEquals(hashColumn, Columns[leftJoinColumn]) ? other.Columns[rightJoinColumn] : Columns[leftJoinColumn]; - Dictionary> multimap = hashColumn.GroupColumnValues(out HashSet smallerDataFrameColumnNullIndices); + supplementaryDataFrame = isLeftDataFrameRetained ? other : this; + var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; - for (long i = 0; i < otherColumn.Length; i++) - { - var otherColumnValue = otherColumn[i]; - if (otherColumnValue != null) - { - if (multimap.TryGetValue((TKey)otherColumnValue, out ICollection rowNumbers)) - { - foreach (long row in rowNumbers) - { - leftRowIndices.Append(leftColumnIsSmaller ? row : i); - rightRowIndices.Append(leftColumnIsSmaller ? i : row); - } - } - } - else - { - foreach (long nullIndex in smallerDataFrameColumnNullIndices) - { - leftRowIndices.Append(leftColumnIsSmaller ? nullIndex : i); - rightRowIndices.Append(leftColumnIsSmaller ? i : nullIndex); - } - } - } + retainedDataFrame = isLeftDataFrameRetained ? this : other; + var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + + Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices, true); } else if (joinAlgorithm == JoinAlgorithm.FullOuter) { - DataFrameColumn otherColumn = other.Columns[rightJoinColumn]; - Dictionary> multimap = otherColumn.GroupColumnValues(out HashSet otherColumnNullIndices); - Dictionary intersection = new Dictionary(EqualityComparer.Default); + //In full outer join we would like to retain data from both side, so we do it into 2 steps: one first we do LEFT JOIN and then add lost data from the RIGHT side + + //Step 1 + //Do LEFT JOIN + isLeftDataFrameRetained = true; - // Go over the records in this dataframe and match with the dictionary - DataFrameColumn thisColumn = Columns[leftJoinColumn]; - Int64DataFrameColumn thisColumnNullIndices = new Int64DataFrameColumn("ThisColumnNullIndices"); + supplementaryDataFrame = isLeftDataFrameRetained ? other : this; + var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; - for (long i = 0; i < thisColumn.Length; i++) - { - var thisColumnValue = thisColumn[i]; - if (thisColumnValue != null) - { - if (multimap.TryGetValue((TKey)thisColumnValue, out ICollection rowNumbers)) - { - foreach (long row in rowNumbers) - { - leftRowIndices.Append(i); - rightRowIndices.Append(row); - if (!intersection.ContainsKey((TKey)thisColumnValue)) - { - intersection.Add((TKey)thisColumnValue, rowNumber); - } - } - } - else - { - leftRowIndices.Append(i); - rightRowIndices.Append(null); - } - } - else - { - thisColumnNullIndices.Append(i); - } - } - for (long i = 0; i < otherColumn.Length; i++) + retainedDataFrame = isLeftDataFrameRetained ? this : other; + var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + + var intersection = Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices, calculateIntersection: true); + + //Step 2 + //Do RIGHT JOIN to retain all data from supplementary DataFrame too (take into account data intersection from the first step to avoid duplicates) + DataFrameColumn supplementaryColumn = supplementaryDataFrame.Columns[supplementaryJoinColumn]; + + for (long i = 0; i < supplementaryColumn.Length; i++) { - var value = otherColumn[i]; + var value = supplementaryColumn[i]; if (value != null) { if (!intersection.ContainsKey((TKey)value)) { - leftRowIndices.Append(null); - rightRowIndices.Append(i); + retainedRowIndices.Append(null); + supplementaryRowIndices.Append(i); } } } - - // Now handle the null rows - foreach (long? thisColumnNullIndex in thisColumnNullIndices) - { - foreach (long otherColumnNullIndex in otherColumnNullIndices) - { - leftRowIndices.Append(thisColumnNullIndex.Value); - rightRowIndices.Append(otherColumnNullIndex); - } - if (otherColumnNullIndices.Count == 0) - { - leftRowIndices.Append(thisColumnNullIndex.Value); - rightRowIndices.Append(null); - } - } - if (thisColumnNullIndices.Length == 0) - { - foreach (long otherColumnNullIndex in otherColumnNullIndices) - { - leftRowIndices.Append(null); - rightRowIndices.Append(otherColumnNullIndex); - } - } } else throw new NotImplementedException(nameof(joinAlgorithm)); - - for (int i = 0; i < leftDataFrame.Columns.Count; i++) + + DataFrame ret = new DataFrame(); + + //insert columns from left dataframe (this) + for (int i = 0; i < this.Columns.Count; i++) { - ret.Columns.Insert(i, leftDataFrame.Columns[i].Clone(leftRowIndices)); + ret.Columns.Insert(i, this.Columns[i].Clone(isLeftDataFrameRetained ? retainedRowIndices : supplementaryRowIndices)); } - for (int i = 0; i < rightDataFrame.Columns.Count; i++) + + //insert columns from right dataframe (other) + for (int i = 0; i < other.Columns.Count; i++) { - DataFrameColumn column = rightDataFrame.Columns[i].Clone(rightRowIndices); + DataFrameColumn column = other.Columns[i].Clone(isLeftDataFrameRetained ? supplementaryRowIndices : retainedRowIndices); SetSuffixForDuplicatedColumnNames(ret, column, leftSuffix, rightSuffix); ret.Columns.Insert(ret.Columns.Count, column); } diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs index 34b676062a..6180d42b8e 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs @@ -1830,37 +1830,39 @@ public void TestMergeEdgeCases_Outer() DataFrame left = MakeDataFrameWithAllMutableColumnTypes(5); left["Int"][3] = null; DataFrame right = MakeDataFrameWithAllMutableColumnTypes(5); + right["Int"][1] = 5; + right["Int"][3] = null; + right["Int"][4] = 6; + // Creates this case: /* - * Left: Right: - * 0 0 - * 1 5 - * null(2) null(7) - * null(3) null(8) - * 4 6 + * Left: Right: RowIndex: + * 0 0 0 + * 1 5 1 + * null null 2 + * null(3) null(3) 3 + * 4 6 4 */ + /* * Merge will result in a DataFrame like: - * Int_Left Int_Right - * 0 0 - * 1 null - * 4 null - * null 5 - * null 6 - * null(2) null(7) - * null(2) null(8) - * null(3) null(7) - * null(3) null(8) + * Int_left: Int_right: Merged: Index: + * 0 0 0 - 0 0 + * 1 null 1 - N 1 + * null null 2 - 2 2 + * null null(3) 2 - 3 3 + * null(3) null 3 - 2 4 + * null(3) null(3) 3 - 3 5 + * 4 null 4 - N 6 + * null 5 N - 1 7 + * null 6 N - 4 8 */ - right["Int"][1] = 5; - right["Int"][3] = null; - right["Int"][4] = 6; DataFrame merge = left.Merge(right, "Int", "Int", joinAlgorithm: JoinAlgorithm.FullOuter); Assert.Equal(9, merge.Rows.Count); Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); - int[] mergeRows = new int[] { 0, 5, 6, 7, 8 }; + int[] mergeRows = new int[] { 0, 2, 3, 4, 5 }; int[] leftRows = new int[] { 0, 2, 2, 3, 3 }; int[] rightRows = new int[] { 0, 2, 3, 2, 3 }; for (long i = 0; i < mergeRows.Length; i++) @@ -1871,7 +1873,7 @@ public void TestMergeEdgeCases_Outer() MatchRowsOnMergedDataFrame(merge, left, right, rowIndex, leftRowIndex, rightRowIndex); } - mergeRows = new int[] { 1, 2 }; + mergeRows = new int[] { 1, 6 }; leftRows = new int[] { 1, 4 }; for (long i = 0; i < mergeRows.Length; i++) { @@ -1880,7 +1882,7 @@ public void TestMergeEdgeCases_Outer() MatchRowsOnMergedDataFrame(merge, left, right, rowIndex, leftRowIndex, null); } - mergeRows = new int[] { 3, 4 }; + mergeRows = new int[] { 7, 8 }; rightRows = new int[] { 1, 4 }; for (long i = 0; i < mergeRows.Length; i++) { From bd22036f09a830aa471dd5c5ac2c12c10517df89 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Fri, 4 Jun 2021 14:21:20 +0300 Subject: [PATCH 11/21] Add non generic DataFrame Merge method (#5657) --- .../ArrowStringDataFrameColumn.cs | 5 +++ src/Microsoft.Data.Analysis/DataFrame.Join.cs | 17 +++++---- .../DataFrameColumn.cs | 38 +++++++++++++++++++ .../PrimitiveDataFrameColumn.cs | 5 +++ .../StringDataFrameColumn.cs | 5 +++ 5 files changed, 63 insertions(+), 7 deletions(-) diff --git a/src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs b/src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs index d05912ca6f..5065009422 100644 --- a/src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs +++ b/src/Microsoft.Data.Analysis/ArrowStringDataFrameColumn.cs @@ -660,5 +660,10 @@ public ArrowStringDataFrameColumn Apply(Func func) } return ret; } + + public override Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices) + { + return GetGroupedOccurrences(other, out otherColumnNullIndices); + } } } diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 7a0d58bd4a..75d645f4ff 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -149,21 +149,21 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right retainedRowIndices = new PrimitiveDataFrameColumn("RetainedIndices"); supplementaryRowIndices = new PrimitiveDataFrameColumn("SupplementaryIndices"); + - // First hash supplementary dataframe - DataFrameColumn supplementaryColumn = supplementaryDataFame.Columns[supplemetaryJoinColumnName]; - Dictionary> multimap = supplementaryColumn.GroupColumnValues(out HashSet supplementaryColumnNullIndices); - - // Go over the records in this dataframe and match with the dictionary + // Get occurrences of values in join column of the retained dataframe in join column of the supplementary dataframe DataFrameColumn retainedColumn = retainedDataFrame.Columns[retainedJoinColumnName]; + DataFrameColumn supplementaryColumn = supplementaryDataFame.Columns[supplemetaryJoinColumnName]; + var occurrences = retainedColumn.GetGroupedOccurrences(supplementaryColumn, out HashSet supplementaryColumnNullIndices); + for (long i = 0; i < retainedColumn.Length; i++) { var retainedValue = retainedColumn[i]; if (retainedValue != null) { - //Get all rows from supplementary dataframe that sutisfy JOIN condition - if (multimap.TryGetValue((TKey)retainedValue, out ICollection rowIndices)) + //Get all row indexes from supplementary dataframe that sutisfy JOIN condition + if (occurrences.TryGetValue(i, out ICollection rowIndices)) { foreach (long rowIndex in rowIndices) { @@ -215,6 +215,9 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right /// public DataFrame Merge(DataFrame other, string leftJoinColumn, string rightJoinColumn, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) { + if (other == null) + throw new ArgumentNullException(nameof(other)); + //In Outer join the joined dataframe retains each row — even if no other matching row exists in supplementary dataframe. //Outer joins subdivide further into left outer joins (left dataframe is retained), right outer joins (rightdataframe is retained), in full outer both are retained diff --git a/src/Microsoft.Data.Analysis/DataFrameColumn.cs b/src/Microsoft.Data.Analysis/DataFrameColumn.cs index ac959ba290..d6296e96f1 100644 --- a/src/Microsoft.Data.Analysis/DataFrameColumn.cs +++ b/src/Microsoft.Data.Analysis/DataFrameColumn.cs @@ -210,6 +210,44 @@ public virtual DataFrameColumn Sort(bool ascending = true) /// A mapping of value() to the indices containing this value public virtual Dictionary> GroupColumnValues(out HashSet nullIndices) => throw new NotImplementedException(); + /// + /// Get occurences of each value from this column in other column, grouped by this value + /// + /// + /// + /// A mapping of index from this column to the indices of same value in other column + public abstract Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices); + + /// + /// Get occurences of each value from this column in other column, grouped by this value + /// + /// + /// + /// + /// A mapping of index from this column to the indices of same value in other column + protected Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices) + { + if (this.DataType != other.DataType) + throw new ArgumentException(); //TODO provide appropriate exception text + + // First hash other column + Dictionary> multimap = other.GroupColumnValues(out otherColumnNullIndices); + + var ret = new Dictionary>(); + + //For each value in this column find rows from other column with equal value + for (int i = 0; i < this.Length; i++) + { + var value = this[i]; + if (value != null && multimap.TryGetValue((TKey)value, out ICollection otherRowIndices)) + { + ret.Add(i, otherRowIndices); + } + } + + return ret; + } + /// /// Returns a DataFrame containing counts of unique values /// diff --git a/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs b/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs index 0c586c8cd7..8b44bca18f 100644 --- a/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs +++ b/src/Microsoft.Data.Analysis/PrimitiveDataFrameColumn.cs @@ -810,5 +810,10 @@ protected internal override Delegate GetValueGetterUsingCursor(DataViewRowCursor { return cursor.GetGetter(schemaColumn); } + + public override Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices) + { + return GetGroupedOccurrences(other, out otherColumnNullIndices); + } } } diff --git a/src/Microsoft.Data.Analysis/StringDataFrameColumn.cs b/src/Microsoft.Data.Analysis/StringDataFrameColumn.cs index 4faea0ee58..59e8792081 100644 --- a/src/Microsoft.Data.Analysis/StringDataFrameColumn.cs +++ b/src/Microsoft.Data.Analysis/StringDataFrameColumn.cs @@ -511,5 +511,10 @@ protected internal override Delegate GetValueGetterUsingCursor(DataViewRowCursor { return cursor.GetGetter>(schemaColumn); } + + public override Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices) + { + return GetGroupedOccurrences(other, out otherColumnNullIndices); + } } } From d2d5f36c4ceaa9960c81feb972053652719758b2 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Fri, 4 Jun 2021 22:17:08 +0300 Subject: [PATCH 12/21] Add support for multi columns join in DataFrame (#5657) --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 128 +++++++++++++----- .../DataFrameTests.cs | 126 ++++++++++++++++- 2 files changed, 220 insertions(+), 34 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 75d645f4ff..26df600832 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Generic; +using System.Linq; namespace Microsoft.Data.Analysis { @@ -140,42 +141,96 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right } return ret; } - - // TODO: Merge API with an "On" parameter that merges on a column common to 2 dataframes - - private static Dictionary Merge(DataFrame retainedDataFrame, DataFrame supplementaryDataFame, string retainedJoinColumnName, string supplemetaryJoinColumnName, out PrimitiveDataFrameColumn retainedRowIndices, out PrimitiveDataFrameColumn supplementaryRowIndices, bool isInner = false, bool calculateIntersection = false) + + private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supplementaryDataFrame, string[] retainedJoinColumnNames, string[] supplemetaryJoinColumnNames, out PrimitiveDataFrameColumn retainedRowIndices, out PrimitiveDataFrameColumn supplementaryRowIndices, bool isInner = false, bool calculateIntersection = false) { - Dictionary intersection = calculateIntersection ? new Dictionary(EqualityComparer.Default) : null; + if (retainedJoinColumnNames == null) + throw new ArgumentNullException(nameof(retainedJoinColumnNames)); - retainedRowIndices = new PrimitiveDataFrameColumn("RetainedIndices"); - supplementaryRowIndices = new PrimitiveDataFrameColumn("SupplementaryIndices"); - + if (supplemetaryJoinColumnNames == null) + throw new ArgumentNullException(nameof(supplemetaryJoinColumnNames)); - // Get occurrences of values in join column of the retained dataframe in join column of the supplementary dataframe - DataFrameColumn retainedColumn = retainedDataFrame.Columns[retainedJoinColumnName]; + if (retainedJoinColumnNames.Length != supplemetaryJoinColumnNames.Length) + throw new ArgumentException("", nameof(retainedJoinColumnNames)); //TODO provide correct message for the exception + - DataFrameColumn supplementaryColumn = supplementaryDataFame.Columns[supplemetaryJoinColumnName]; - var occurrences = retainedColumn.GetGroupedOccurrences(supplementaryColumn, out HashSet supplementaryColumnNullIndices); + HashSet intersection = calculateIntersection ? new HashSet() : null; + + Dictionary> occurrences = null; - for (long i = 0; i < retainedColumn.Length; i++) + // Get occurrences of values in columns used for join in the retained and supplementary dataframes + Dictionary retainedIndicesReverseMapping = null; + Dictionary> rowOccurrences = null; + + for (int colNameIndex = 0; colNameIndex < retainedJoinColumnNames.Length; colNameIndex++) { - var retainedValue = retainedColumn[i]; - if (retainedValue != null) + DataFrameColumn shrinkedRetainedColumn = retainedDataFrame.Columns[retainedJoinColumnNames[colNameIndex]]; + + //shrink retained column by row occurrences from previouse step + if (rowOccurrences != null) + { + var shrinkedRetainedIndices = rowOccurrences.Keys.ToArray(); + + var newRetainedIndicesReverseMapping = new Dictionary(); + for (int i = 0; i < shrinkedRetainedIndices.Length; i++) + { + //store reverse mapping to restore original dataframe indices from indices in shrinked row + newRetainedIndicesReverseMapping.Add(i, retainedIndicesReverseMapping != null ? retainedIndicesReverseMapping[shrinkedRetainedIndices[i]] : shrinkedRetainedIndices[i] ); + } + + retainedIndicesReverseMapping = newRetainedIndicesReverseMapping; + shrinkedRetainedColumn = shrinkedRetainedColumn.Clone(new Int64DataFrameColumn("Indices", shrinkedRetainedIndices)); + } + + DataFrameColumn supplementaryColumn = supplementaryDataFrame.Columns[supplemetaryJoinColumnNames[colNameIndex]]; + + var newOccurrences = shrinkedRetainedColumn.GetGroupedOccurrences(supplementaryColumn, out HashSet supplementaryColumnNullIndices); + + // shrink join result on current column by previouse join columns (if any) + if (rowOccurrences != null) + { + var shrinkedOccurences = new Dictionary>(); + + foreach (var kvp in newOccurrences) + { + var newValue = kvp.Value.Where(i => rowOccurrences[retainedIndicesReverseMapping[kvp.Key]].Contains(i)).ToArray(); + if (newValue.Any()) + { + shrinkedOccurences.Add(kvp.Key, newValue); + } + } + + newOccurrences = shrinkedOccurences; + } + + rowOccurrences = newOccurrences; + } + + //Restore occurences + occurrences = rowOccurrences.ToDictionary(kvp => retainedIndicesReverseMapping[kvp.Key], kvp => kvp.Value); + + retainedRowIndices = new Int64DataFrameColumn("RetainedIndices"); + supplementaryRowIndices = new Int64DataFrameColumn("SupplementaryIndices"); + + for (long i = 0; i < retainedDataFrame.Columns.RowCount; i++) + { + //retainedColumn[i]; //TODO check for null all joined columns + if (true) { //Get all row indexes from supplementary dataframe that sutisfy JOIN condition if (occurrences.TryGetValue(i, out ICollection rowIndices)) { - foreach (long rowIndex in rowIndices) + foreach (long supplementaryRowIndex in rowIndices) { retainedRowIndices.Append(i); - supplementaryRowIndices.Append(rowIndex); + supplementaryRowIndices.Append(supplementaryRowIndex); //store intersection if required if (calculateIntersection) { - if (!intersection.ContainsKey((TKey)retainedValue)) + if (!intersection.Contains(supplementaryRowIndex)) { - intersection.Add((TKey)retainedValue, rowIndex); + intersection.Add(supplementaryRowIndex); } } } @@ -191,20 +246,21 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right } else { + /* foreach (long row in supplementaryColumnNullIndices) { retainedRowIndices.Append(i); supplementaryRowIndices.Append(row); } + */ } } - + return intersection; } - /// - /// Merge DataFrames with a database style join + /// Merge DataFrames with a database style join (for backward compatibility) /// /// /// @@ -214,6 +270,12 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right /// /// public DataFrame Merge(DataFrame other, string leftJoinColumn, string rightJoinColumn, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) + { + return Merge(other, new[] { leftJoinColumn }, new[] { rightJoinColumn }, leftSuffix, rightSuffix, joinAlgorithm); + } + + + public DataFrame Merge(DataFrame other, string[] leftJoinColumns, string[] rightJoinColumns, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) { if (other == null) throw new ArgumentNullException(nameof(other)); @@ -232,12 +294,12 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ isLeftDataFrameRetained = (joinAlgorithm == JoinAlgorithm.Left); supplementaryDataFrame = isLeftDataFrameRetained ? other : this; - var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; + var supplementaryJoinColumns = isLeftDataFrameRetained ? rightJoinColumns : leftJoinColumns; retainedDataFrame = isLeftDataFrameRetained ? this : other; - var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + var retainedJoinColumns = isLeftDataFrameRetained ? leftJoinColumns : rightJoinColumns; - Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices); + Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumns, supplementaryJoinColumns, out retainedRowIndices, out supplementaryRowIndices); } else if (joinAlgorithm == JoinAlgorithm.Inner) @@ -246,12 +308,12 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ isLeftDataFrameRetained = (Rows.Count > other.Rows.Count); supplementaryDataFrame = isLeftDataFrameRetained ? other : this; - var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; + var supplementaryJoinColumns = isLeftDataFrameRetained ? rightJoinColumns : leftJoinColumns; retainedDataFrame = isLeftDataFrameRetained ? this : other; - var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + var retainedJoinColumns = isLeftDataFrameRetained ? leftJoinColumns : rightJoinColumns; - Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices, true); + Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumns, supplementaryJoinColumns, out retainedRowIndices, out supplementaryRowIndices, true); } else if (joinAlgorithm == JoinAlgorithm.FullOuter) { @@ -262,13 +324,14 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ isLeftDataFrameRetained = true; supplementaryDataFrame = isLeftDataFrameRetained ? other : this; - var supplementaryJoinColumn = isLeftDataFrameRetained ? rightJoinColumn : leftJoinColumn; + var supplementaryJoinColumns = isLeftDataFrameRetained ? rightJoinColumns : leftJoinColumns; retainedDataFrame = isLeftDataFrameRetained ? this : other; - var retainedJoinColumn = isLeftDataFrameRetained ? leftJoinColumn : rightJoinColumn; + var retainedJoinColumns = isLeftDataFrameRetained ? leftJoinColumns : rightJoinColumns; - var intersection = Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumn, supplementaryJoinColumn, out retainedRowIndices, out supplementaryRowIndices, calculateIntersection: true); + var intersection = Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumns, supplementaryJoinColumns, out retainedRowIndices, out supplementaryRowIndices, calculateIntersection: true); + /* //Step 2 //Do RIGHT JOIN to retain all data from supplementary DataFrame too (take into account data intersection from the first step to avoid duplicates) DataFrameColumn supplementaryColumn = supplementaryDataFrame.Columns[supplementaryJoinColumn]; @@ -278,13 +341,14 @@ public DataFrame Merge(DataFrame other, string leftJoinColumn, string righ var value = supplementaryColumn[i]; if (value != null) { - if (!intersection.ContainsKey((TKey)value)) + if (!intersection.Contains(i)) { retainedRowIndices.Append(null); supplementaryRowIndices.Append(i); } } } + */ } else throw new NotImplementedException(nameof(joinAlgorithm)); diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs index 6180d42b8e..920d7e9bcd 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs @@ -1705,7 +1705,7 @@ public void TestMerge() Assert.Equal(merge.Columns["Int_right"][2], right.Columns["Int"][2]); VerifyMerge(merge, left, right, JoinAlgorithm.Inner); } - + private void MatchRowsOnMergedDataFrame(DataFrame merge, DataFrame left, DataFrame right, long mergeRow, long? leftRow, long? rightRow) { Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); @@ -1775,6 +1775,7 @@ public void TestMergeEdgeCases_LeftOrRight(int leftLength, int rightLength, Join } } + [Fact] public void TestMergeEdgeCases_Inner() { @@ -1892,6 +1893,126 @@ public void TestMergeEdgeCases_Outer() } } + [Fact] + public void TestMerge_ByTwoJoinColumns_Complex_LeftJoin() + { + //Test left merge by to int type columns + var leftDataFrame = new DataFrame(); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2, 3, 4, 5 })); + leftDataFrame.Columns.Add (new Int32DataFrameColumn("G1", new[] { 0, 1, 1, 2, 2, 3 })); + leftDataFrame.Columns.Add (new Int32DataFrameColumn("G2", new[] { 3, 1, 2, 1, 2, 1})); + + var rightDataFrame = new DataFrame(); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2, 3 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 1, 2 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 2, 1, 1 })); + + // Creates this case: + /* ------------------------- + * Left | Right + * I G1 G2 | I G1 G2 + * ------------------------- + * 0 0 3 | 0 1 1 + * 1 1 1 | 1 1 2 + * 2 1 2 | 2 1 1 + * 3 2 1 | 3 2 1 + * 4 2 2 + * 5 3 1 + */ + + /* + * Merge will result in a DataFrame like: + * IL G1 G2 IR + * ------------------------- + * 0 0 3 + * 1 1 1 0 1 1 + * 1 1 1 2 1 1 + * 2 1 2 1 1 2 + * 3 2 1 3 2 1 + * 4 2 2 + * 5 3 1 + */ + + var merge = leftDataFrame.Merge(rightDataFrame, new[] { "G1", "G2" }, new[] { "G1", "G2" }); + + } + + [Fact] + public void TestMerge_ByTwoColumns_Simple_ManyToMany_LeftJoin() + { + //Test left merge by to int type columns + var leftDataFrame = new DataFrame(); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 3 })); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 3 })); + + var rightDataFrame = new DataFrame(); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1 })); + + // Creates this case: + /* --------------------------- + * Left | Right + * I G1 G2 | I G1 G2 + * --------------------------- + * 0 1 1 | 0 1 1 + * 1 1 1 | 1 1 1 + * 2 3 3 + */ + + /* + * Merge will result in a DataFrame like: + * IL G1 G2 IR + * ------------------------- + * 0 1 1 0 1 1 + * 0 1 1 1 1 1 + * 1 1 1 0 1 1 + * 1 1 1 1 1 1 + * 2 3 3 + */ + + var merge = leftDataFrame.Merge(rightDataFrame, new[] { "G1", "G2" }, new[] { "G1", "G2" }); + } + + [Fact] + public void TestMerge_ByThreeColumns_OneToOne_LeftJoin() + { + //Test left merge by to int type columns + var leftDataFrame = new DataFrame(); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 2 })); + leftDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 2, 1 })); + leftDataFrame.Columns.Add(new StringDataFrameColumn("G3", new[] { "A", "B", "C" })); + + var rightDataFrame = new DataFrame(); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 0, 1, 1 })); + rightDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 2 })); + rightDataFrame.Columns.Add(new StringDataFrameColumn("G3", new[] { "Z", "Y", "B" })); + + // Creates this case: + /* ----------------------------- + * Left | Right + * I G1 G2 G3 | I G1 G2 G3 + * ------------------------------ + * 0 1 1 A | 0 0 1 Z + * 1 1 2 B | 1 1 1 Y + * 2 2 1 C | 2 1 2 B + */ + + /* + * Merge will result in a DataFrame like: + * IL G1 G2 G3 IR + * ------------------------- + * 0 1 1 A + * 1 1 2 B 2 1 2 B + * 2 2 1 C + */ + + var merge = leftDataFrame.Merge(rightDataFrame, new[] { "G1", "G2", "G3" }, new[] { "G1", "G2", "G3" }); + } + [Fact] public void TestMerge_Issue5778() { @@ -2116,7 +2237,8 @@ public void TestClone(int dfLength, int intDfLength) } } } - + + [Fact] public void TestColumnCreationFromExisitingColumn() { From 0494f7c14367f6a5e35758167a69614e1c31c0a8 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Sat, 5 Jun 2021 17:56:46 +0300 Subject: [PATCH 13/21] Fix failing tests for inner, left and right joins with nulls --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 32 +++++++++++++------ 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 26df600832..16840c8b4a 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -141,6 +141,16 @@ public DataFrame Join(DataFrame other, string leftSuffix = "_left", string right } return ret; } + + private static bool IsAnyNullValueInColumns (IReadOnlyCollection columns, long index) + { + foreach (var column in columns) + { + if (column[index] == null) + return true; + } + return false; + } private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supplementaryDataFrame, string[] retainedJoinColumnNames, string[] supplemetaryJoinColumnNames, out PrimitiveDataFrameColumn retainedRowIndices, out PrimitiveDataFrameColumn supplementaryRowIndices, bool isInner = false, bool calculateIntersection = false) { @@ -162,6 +172,8 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple Dictionary retainedIndicesReverseMapping = null; Dictionary> rowOccurrences = null; + HashSet supplementaryJoinColumnsNullIndices = new HashSet(); + for (int colNameIndex = 0; colNameIndex < retainedJoinColumnNames.Length; colNameIndex++) { DataFrameColumn shrinkedRetainedColumn = retainedDataFrame.Columns[retainedJoinColumnNames[colNameIndex]]; @@ -186,6 +198,8 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple var newOccurrences = shrinkedRetainedColumn.GetGroupedOccurrences(supplementaryColumn, out HashSet supplementaryColumnNullIndices); + supplementaryJoinColumnsNullIndices.UnionWith(supplementaryColumnNullIndices); + // shrink join result on current column by previouse join columns (if any) if (rowOccurrences != null) { @@ -199,23 +213,22 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple shrinkedOccurences.Add(kvp.Key, newValue); } } - newOccurrences = shrinkedOccurences; } - rowOccurrences = newOccurrences; } //Restore occurences - occurrences = rowOccurrences.ToDictionary(kvp => retainedIndicesReverseMapping[kvp.Key], kvp => kvp.Value); + occurrences = rowOccurrences.ToDictionary(kvp => retainedIndicesReverseMapping == null ? kvp.Key : retainedIndicesReverseMapping[kvp.Key], kvp => kvp.Value); retainedRowIndices = new Int64DataFrameColumn("RetainedIndices"); supplementaryRowIndices = new Int64DataFrameColumn("SupplementaryIndices"); - + + //Perform Merging + var retainJoinColumns = retainedJoinColumnNames.Select(name => retainedDataFrame.Columns[name]).ToArray(); for (long i = 0; i < retainedDataFrame.Columns.RowCount; i++) { - //retainedColumn[i]; //TODO check for null all joined columns - if (true) + if (!IsAnyNullValueInColumns(retainJoinColumns, i)) { //Get all row indexes from supplementary dataframe that sutisfy JOIN condition if (occurrences.TryGetValue(i, out ICollection rowIndices)) @@ -245,14 +258,13 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple } } else - { - /* - foreach (long row in supplementaryColumnNullIndices) + { + foreach (long row in supplementaryJoinColumnsNullIndices) { retainedRowIndices.Append(i); supplementaryRowIndices.Append(row); } - */ + } } From 976351c789065d846475e4ce5936830bd7cddf3a Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Mon, 7 Jun 2021 12:09:41 +0300 Subject: [PATCH 14/21] #5657 fix DataFrame outer join failing tests --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 16840c8b4a..4beb324a88 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -264,7 +264,6 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple retainedRowIndices.Append(i); supplementaryRowIndices.Append(row); } - } } @@ -342,16 +341,14 @@ public DataFrame Merge(DataFrame other, string[] leftJoinColumns, string[] right var retainedJoinColumns = isLeftDataFrameRetained ? leftJoinColumns : rightJoinColumns; var intersection = Merge(retainedDataFrame, supplementaryDataFrame, retainedJoinColumns, supplementaryJoinColumns, out retainedRowIndices, out supplementaryRowIndices, calculateIntersection: true); - - /* + //Step 2 //Do RIGHT JOIN to retain all data from supplementary DataFrame too (take into account data intersection from the first step to avoid duplicates) - DataFrameColumn supplementaryColumn = supplementaryDataFrame.Columns[supplementaryJoinColumn]; - - for (long i = 0; i < supplementaryColumn.Length; i++) + + for (long i = 0; i < supplementaryDataFrame.Columns.RowCount; i++) { - var value = supplementaryColumn[i]; - if (value != null) + var columns = supplementaryJoinColumns.Select(name => supplementaryDataFrame.Columns[name]).ToArray(); + if (!IsAnyNullValueInColumns(columns, i)) { if (!intersection.Contains(i)) { @@ -360,7 +357,6 @@ public DataFrame Merge(DataFrame other, string[] leftJoinColumns, string[] right } } } - */ } else throw new NotImplementedException(nameof(joinAlgorithm)); From b5bff6cf600e02a29af64b1413262b121ea8b886 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Mon, 7 Jun 2021 12:28:44 +0300 Subject: [PATCH 15/21] #5657 fix merge issues --- test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs index e718373f45..d9016f2aa1 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs @@ -37,8 +37,7 @@ public void TestGroupingWithTKeyTypeofString() [Fact] public void TestGroupingWithTKey_CornerCases() - { - { + { //Check corner cases var df = MakeTestDataFrameWithParityAndTensColumns(0); var grouping = df.GroupBy("Parity").Groupings; @@ -100,7 +99,6 @@ public void TestGroupingWithTKeyOfWrongType() Assert.Throws(() => df.GroupBy("Tens")); } - private DataFrame MakeTestDataFrameWithParityAndTensColumns(int length) { DataFrame df = DataFrameTests.MakeDataFrameWithNumericColumns(length, false); From 1940f3a234722aa8a6c7a29302c00be6060a3ebc Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Mon, 7 Jun 2021 13:03:16 +0300 Subject: [PATCH 16/21] Minor changes #5657 --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 1 - src/Microsoft.Data.Analysis/DataFrame.cs | 1 - .../DataFrameGroupByTests.cs | 17 ++++++++--------- .../DataFrameTests.cs | 4 +--- 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 4beb324a88..c43a92e04c 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -344,7 +344,6 @@ public DataFrame Merge(DataFrame other, string[] leftJoinColumns, string[] right //Step 2 //Do RIGHT JOIN to retain all data from supplementary DataFrame too (take into account data intersection from the first step to avoid duplicates) - for (long i = 0; i < supplementaryDataFrame.Columns.RowCount; i++) { var columns = supplementaryJoinColumns.Select(name => supplementaryDataFrame.Columns[name]).ToArray(); diff --git a/src/Microsoft.Data.Analysis/DataFrame.cs b/src/Microsoft.Data.Analysis/DataFrame.cs index cbd48ec06e..b3e04bf7b6 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.cs @@ -384,7 +384,6 @@ public GroupBy GroupBy(string columnName) throw new InvalidCastException(String.Format(Strings.BadColumnCastDuringGrouping, columnName, column.DataType, typeof(TKey))); } - return group; } diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs index 57bac29d6e..2319a9b64f 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameGroupByTests.cs @@ -15,10 +15,10 @@ public class DataFrameGroupByTests [Fact] public void TestGroupingWithTKeyTypeofString() { - const int lenght = 11; + const int length = 11; //Create test dataframe (numbers starting from 0 up to lenght) - DataFrame df = MakeTestDataFrameWithParityAndTensColumns(lenght); + DataFrame df = MakeTestDataFrameWithParityAndTensColumns(length); var grouping = df.GroupBy("Parity").Groupings; @@ -28,11 +28,11 @@ public void TestGroupingWithTKeyTypeofString() //Check number of elements in each group var oddGroup = grouping.Where(gr => gr.Key == "odd").FirstOrDefault(); Assert.NotNull(oddGroup); - Assert.Equal(lenght/2, oddGroup.Count()); + Assert.Equal(length/2, oddGroup.Count()); var evenGroup = grouping.Where(gr => gr.Key == "even").FirstOrDefault(); Assert.NotNull(evenGroup); - Assert.Equal(lenght / 2 + lenght % 2, evenGroup.Count()); + Assert.Equal(length / 2 + length % 2, evenGroup.Count()); } @@ -56,10 +56,10 @@ public void TestGroupingWithTKey_CornerCases() [Fact] public void TestGroupingWithTKeyPrimitiveType() { - const int lenght = 55; + const int length = 55; //Create test dataframe (numbers starting from 0 up to lenght) - DataFrame df = MakeTestDataFrameWithParityAndTensColumns(lenght); + DataFrame df = MakeTestDataFrameWithParityAndTensColumns(length); //Group elements by int column, that contain the amount of full tens in each int var groupings = df.GroupBy("Tens").Groupings.ToDictionary(g => g.Key, g => g.ToList()); @@ -68,7 +68,7 @@ public void TestGroupingWithTKeyPrimitiveType() int numberColumnsCount = df.Columns.Count - 2; //except "Parity" and "Tens" columns //Check each group - for (int i = 0; i < lenght / 10; i++) + for (int i = 0; i < length / 10; i++) { Assert.Equal(10, groupings[i].Count()); @@ -85,13 +85,12 @@ public void TestGroupingWithTKeyPrimitiveType() } //Last group should contain smaller amount of items - Assert.Equal(lenght % 10, groupings[lenght / 10].Count()); + Assert.Equal(length % 10, groupings[length / 10].Count()); } [Fact] public void TestGroupingWithTKeyOfWrongType() { - var message = string.Empty; //Create test dataframe (numbers starting from 0 up to lenght) diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs index 920d7e9bcd..e05990ecaf 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs @@ -1705,7 +1705,7 @@ public void TestMerge() Assert.Equal(merge.Columns["Int_right"][2], right.Columns["Int"][2]); VerifyMerge(merge, left, right, JoinAlgorithm.Inner); } - + private void MatchRowsOnMergedDataFrame(DataFrame merge, DataFrame left, DataFrame right, long mergeRow, long? leftRow, long? rightRow) { Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); @@ -1775,7 +1775,6 @@ public void TestMergeEdgeCases_LeftOrRight(int leftLength, int rightLength, Join } } - [Fact] public void TestMergeEdgeCases_Inner() { @@ -2237,7 +2236,6 @@ public void TestClone(int dfLength, int intDfLength) } } } - [Fact] public void TestColumnCreationFromExisitingColumn() From 4232fee9826f85220d94f117f87518bf96e19034 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Mon, 7 Jun 2021 17:11:28 +0300 Subject: [PATCH 17/21] Add self explanatory exception text (#5657) --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 2 +- src/Microsoft.Data.Analysis/DataFrameColumn.cs | 2 +- src/Microsoft.Data.Analysis/Strings.Designer.cs | 11 ++++++++++- src/Microsoft.Data.Analysis/Strings.resx | 3 +++ 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index c43a92e04c..8ee0536d21 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -161,7 +161,7 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple throw new ArgumentNullException(nameof(supplemetaryJoinColumnNames)); if (retainedJoinColumnNames.Length != supplemetaryJoinColumnNames.Length) - throw new ArgumentException("", nameof(retainedJoinColumnNames)); //TODO provide correct message for the exception + throw new ArgumentException(Strings.MismatchedArrayLengths, nameof(retainedJoinColumnNames)); HashSet intersection = calculateIntersection ? new HashSet() : null; diff --git a/src/Microsoft.Data.Analysis/DataFrameColumn.cs b/src/Microsoft.Data.Analysis/DataFrameColumn.cs index d6296e96f1..88415d8bdc 100644 --- a/src/Microsoft.Data.Analysis/DataFrameColumn.cs +++ b/src/Microsoft.Data.Analysis/DataFrameColumn.cs @@ -228,7 +228,7 @@ public virtual DataFrameColumn Sort(bool ascending = true) protected Dictionary> GetGroupedOccurrences(DataFrameColumn other, out HashSet otherColumnNullIndices) { if (this.DataType != other.DataType) - throw new ArgumentException(); //TODO provide appropriate exception text + throw new ArgumentException(String.Format(Strings.MismatchedColumnValueType, this.DataType), nameof(other)); // First hash other column Dictionary> multimap = other.GroupColumnValues(out otherColumnNullIndices); diff --git a/src/Microsoft.Data.Analysis/Strings.Designer.cs b/src/Microsoft.Data.Analysis/Strings.Designer.cs index 4b24665bf3..9cbf90f38e 100644 --- a/src/Microsoft.Data.Analysis/Strings.Designer.cs +++ b/src/Microsoft.Data.Analysis/Strings.Designer.cs @@ -268,7 +268,7 @@ internal class Strings { } /// - /// Looks up a localized string similar to Column does not exist. + /// Looks up a localized string similar to Column '{0}' does not exist. /// internal static string InvalidColumnName { get { @@ -312,6 +312,15 @@ internal class Strings { } } + /// + /// Looks up a localized string similar to Array lengths are mistmached. + /// + internal static string MismatchedArrayLengths { + get { + return ResourceManager.GetString("MismatchedArrayLengths", resourceCulture); + } + } + /// /// Looks up a localized string similar to Column lengths are mismatched. /// diff --git a/src/Microsoft.Data.Analysis/Strings.resx b/src/Microsoft.Data.Analysis/Strings.resx index 8975448708..79764037cc 100644 --- a/src/Microsoft.Data.Analysis/Strings.resx +++ b/src/Microsoft.Data.Analysis/Strings.resx @@ -201,6 +201,9 @@ MapIndices exceeds column length + + Array lengths are mistmached + Column lengths are mismatched From 33a99c36e935805e6ea81786aa380cccd0c628d9 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Mon, 7 Jun 2021 17:55:21 +0300 Subject: [PATCH 18/21] Add Asserts to new unit tests (#5657) --- .../DataFrameTests.cs | 162 ++++++++++++------ 1 file changed, 111 insertions(+), 51 deletions(-) diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs index e05990ecaf..6d897a0b9f 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs @@ -1861,7 +1861,7 @@ public void TestMergeEdgeCases_Outer() DataFrame merge = left.Merge(right, "Int", "Int", joinAlgorithm: JoinAlgorithm.FullOuter); Assert.Equal(9, merge.Rows.Count); Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); - + int[] mergeRows = new int[] { 0, 2, 3, 4, 5 }; int[] leftRows = new int[] { 0, 2, 2, 3, 3 }; int[] rightRows = new int[] { 0, 2, 3, 2, 3 }; @@ -1893,18 +1893,20 @@ public void TestMergeEdgeCases_Outer() } [Fact] - public void TestMerge_ByTwoJoinColumns_Complex_LeftJoin() + public void TestMerge_ByTwoColumns_Complex_LeftJoin() { //Test left merge by to int type columns - var leftDataFrame = new DataFrame(); - leftDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2, 3, 4, 5 })); - leftDataFrame.Columns.Add (new Int32DataFrameColumn("G1", new[] { 0, 1, 1, 2, 2, 3 })); - leftDataFrame.Columns.Add (new Int32DataFrameColumn("G2", new[] { 3, 1, 2, 1, 2, 1})); - var rightDataFrame = new DataFrame(); - rightDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2, 3 })); - rightDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 1, 2 })); - rightDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 2, 1, 1 })); + //Arrange + var left = new DataFrame(); + left.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2, 3, 4, 5 })); + left.Columns.Add (new Int32DataFrameColumn("G1", new[] { 0, 1, 1, 2, 2, 3 })); + left.Columns.Add (new Int32DataFrameColumn("G2", new[] { 3, 1, 2, 1, 2, 1})); + + var right = new DataFrame(); + right.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2, 3 })); + right.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 1, 2 })); + right.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 2, 1, 1 })); // Creates this case: /* ------------------------- @@ -1921,18 +1923,38 @@ public void TestMerge_ByTwoJoinColumns_Complex_LeftJoin() /* * Merge will result in a DataFrame like: - * IL G1 G2 IR + * IL G1 G2 IR Merged: * ------------------------- - * 0 0 3 - * 1 1 1 0 1 1 - * 1 1 1 2 1 1 - * 2 1 2 1 1 2 - * 3 2 1 3 2 1 - * 4 2 2 - * 5 3 1 + * 0 0 3 0 - N + * 1 1 1 0 1 1 1 - 0 + * 1 1 1 2 1 1 1 - 2 + * 2 1 2 1 1 2 2 - 1 + * 3 2 1 3 2 1 3 - 3 + * 4 2 2 4 - N + * 5 3 1 5 - N */ - var merge = leftDataFrame.Merge(rightDataFrame, new[] { "G1", "G2" }, new[] { "G1", "G2" }); + //Act + var merge = left.Merge(right, new[] { "G1", "G2" }, new[] { "G1", "G2" }); + + //Assert + var expectedMerged = new (int? Left, int? Right)[] { + (0, null), + (1, 0), + (1, 2), + (2, 1), + (3, 3), + (4, null), + (5, null) + }; + + Assert.Equal(expectedMerged.Length, merge.Rows.Count); + Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); + + for (long i = 0; i < expectedMerged.Length; i++) + { + MatchRowsOnMergedDataFrame(merge, left, right, i, expectedMerged[i].Left, expectedMerged[i].Right); + } } @@ -1940,15 +1962,17 @@ public void TestMerge_ByTwoJoinColumns_Complex_LeftJoin() public void TestMerge_ByTwoColumns_Simple_ManyToMany_LeftJoin() { //Test left merge by to int type columns - var leftDataFrame = new DataFrame(); - leftDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); - leftDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 3 })); - leftDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 3 })); + + //Arrange + var left = new DataFrame(); + left.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + left.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 3 })); + left.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 3 })); - var rightDataFrame = new DataFrame(); - rightDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1 })); - rightDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1 })); - rightDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1 })); + var right = new DataFrame(); + right.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1 })); + right.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1 })); + right.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1 })); // Creates this case: /* --------------------------- @@ -1962,33 +1986,53 @@ public void TestMerge_ByTwoColumns_Simple_ManyToMany_LeftJoin() /* * Merge will result in a DataFrame like: - * IL G1 G2 IR + * IL G1 G2 IR Merged: * ------------------------- - * 0 1 1 0 1 1 - * 0 1 1 1 1 1 - * 1 1 1 0 1 1 - * 1 1 1 1 1 1 - * 2 3 3 + * 0 1 1 0 1 1 0 - 0 + * 0 1 1 1 1 1 0 - 1 + * 1 1 1 0 1 1 1 - 0 + * 1 1 1 1 1 1 1 - 1 + * 2 3 3 2 - N */ - var merge = leftDataFrame.Merge(rightDataFrame, new[] { "G1", "G2" }, new[] { "G1", "G2" }); + //Act + var merge = left.Merge(right, new[] { "G1", "G2" }, new[] { "G1", "G2" }); + + //Assert + var expectedMerged = new (int? Left, int? Right)[] { + (0, 0), + (0, 1), + (1, 0), + (1, 1), + (2, null) + }; + + Assert.Equal(expectedMerged.Length, merge.Rows.Count); + Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); + + for (long i = 0; i < expectedMerged.Length; i++) + { + MatchRowsOnMergedDataFrame(merge, left, right, i, expectedMerged[i].Left, expectedMerged[i].Right); + } } [Fact] public void TestMerge_ByThreeColumns_OneToOne_LeftJoin() { //Test left merge by to int type columns - var leftDataFrame = new DataFrame(); - leftDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); - leftDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 2 })); - leftDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 2, 1 })); - leftDataFrame.Columns.Add(new StringDataFrameColumn("G3", new[] { "A", "B", "C" })); - - var rightDataFrame = new DataFrame(); - rightDataFrame.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); - rightDataFrame.Columns.Add(new Int32DataFrameColumn("G1", new[] { 0, 1, 1 })); - rightDataFrame.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 2 })); - rightDataFrame.Columns.Add(new StringDataFrameColumn("G3", new[] { "Z", "Y", "B" })); + + //Arrange + var left = new DataFrame(); + left.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + left.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 2 })); + left.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 2, 1 })); + left.Columns.Add(new StringDataFrameColumn("G3", new[] { "A", "B", "C" })); + + var right = new DataFrame(); + right.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + right.Columns.Add(new Int32DataFrameColumn("G1", new[] { 0, 1, 1 })); + right.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 2 })); + right.Columns.Add(new StringDataFrameColumn("G3", new[] { "Z", "Y", "B" })); // Creates this case: /* ----------------------------- @@ -2002,14 +2046,30 @@ public void TestMerge_ByThreeColumns_OneToOne_LeftJoin() /* * Merge will result in a DataFrame like: - * IL G1 G2 G3 IR - * ------------------------- - * 0 1 1 A - * 1 1 2 B 2 1 2 B - * 2 2 1 C + * IL G1 G2 G3 IR Merged: + * ------------------------- + * 0 1 1 A 0 - N + * 1 1 2 B 2 1 2 B 1 - 2 + * 2 2 1 C 2 - N */ - var merge = leftDataFrame.Merge(rightDataFrame, new[] { "G1", "G2", "G3" }, new[] { "G1", "G2", "G3" }); + //Act + var merge = left.Merge(right, new[] { "G1", "G2", "G3" }, new[] { "G1", "G2", "G3" }); + + //Assert + var expectedMerged = new (int? Left, int? Right)[] { + (0, null), + (1, 2), + (2, null) + }; + + Assert.Equal(expectedMerged.Length, merge.Rows.Count); + Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); + + for (long i = 0; i < expectedMerged.Length; i++) + { + MatchRowsOnMergedDataFrame(merge, left, right, i, expectedMerged[i].Left, expectedMerged[i].Right); + } } [Fact] From 12a58337120c70628a8a68e7fd1226d44830f566 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Mon, 7 Jun 2021 18:09:59 +0300 Subject: [PATCH 19/21] Minor changes (#5657) --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 8ee0536d21..839b397064 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -151,7 +151,22 @@ private static bool IsAnyNullValueInColumns (IReadOnlyCollection + /// Merge DataFrames with a database style join (for backward compatibility) + /// + /// + /// + /// + /// + /// + /// + /// + public DataFrame Merge(DataFrame other, string leftJoinColumn, string rightJoinColumn, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) + { + return Merge(other, new[] { leftJoinColumn }, new[] { rightJoinColumn }, leftSuffix, rightSuffix, joinAlgorithm); + } + private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supplementaryDataFrame, string[] retainedJoinColumnNames, string[] supplemetaryJoinColumnNames, out PrimitiveDataFrameColumn retainedRowIndices, out PrimitiveDataFrameColumn supplementaryRowIndices, bool isInner = false, bool calculateIntersection = false) { if (retainedJoinColumnNames == null) @@ -269,23 +284,7 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple return intersection; } - - /// - /// Merge DataFrames with a database style join (for backward compatibility) - /// - /// - /// - /// - /// - /// - /// - /// - public DataFrame Merge(DataFrame other, string leftJoinColumn, string rightJoinColumn, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) - { - return Merge(other, new[] { leftJoinColumn }, new[] { rightJoinColumn }, leftSuffix, rightSuffix, joinAlgorithm); - } - - + public DataFrame Merge(DataFrame other, string[] leftJoinColumns, string[] rightJoinColumns, string leftSuffix = "_left", string rightSuffix = "_right", JoinAlgorithm joinAlgorithm = JoinAlgorithm.Left) { if (other == null) From 22fc3e2594d261ff0b55dcbbd8b1f98cfd7d0583 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Tue, 8 Jun 2021 16:20:49 +0300 Subject: [PATCH 20/21] Fix right merge by 3-columns test fails --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 41 +-- .../DataFrameTests.cs | 240 +++++++++++++++++- 2 files changed, 259 insertions(+), 22 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 839b397064..2fb030044a 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -180,29 +180,32 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple HashSet intersection = calculateIntersection ? new HashSet() : null; - - Dictionary> occurrences = null; // Get occurrences of values in columns used for join in the retained and supplementary dataframes - Dictionary retainedIndicesReverseMapping = null; - Dictionary> rowOccurrences = null; - + Dictionary> occurrences = null; + Dictionary retainedIndicesReverseMapping = null; + HashSet supplementaryJoinColumnsNullIndices = new HashSet(); + for (int colNameIndex = 0; colNameIndex < retainedJoinColumnNames.Length; colNameIndex++) { DataFrameColumn shrinkedRetainedColumn = retainedDataFrame.Columns[retainedJoinColumnNames[colNameIndex]]; //shrink retained column by row occurrences from previouse step - if (rowOccurrences != null) + if (occurrences != null) { - var shrinkedRetainedIndices = rowOccurrences.Keys.ToArray(); - - var newRetainedIndicesReverseMapping = new Dictionary(); + //only rows with occurences from previose step should go for futher processing + var shrinkedRetainedIndices = occurrences.Keys.ToArray(); + + //create reverse mapping of index of the row in the shrinked column to the index of this row in the original dataframe (new index -> original index) + var newRetainedIndicesReverseMapping = new Dictionary(shrinkedRetainedIndices.Length); + for (int i = 0; i < shrinkedRetainedIndices.Length; i++) - { + { //store reverse mapping to restore original dataframe indices from indices in shrinked row - newRetainedIndicesReverseMapping.Add(i, retainedIndicesReverseMapping != null ? retainedIndicesReverseMapping[shrinkedRetainedIndices[i]] : shrinkedRetainedIndices[i] ); + var originalIndex = shrinkedRetainedIndices[i]; + newRetainedIndicesReverseMapping.Add(i, originalIndex); } retainedIndicesReverseMapping = newRetainedIndicesReverseMapping; @@ -211,18 +214,24 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple DataFrameColumn supplementaryColumn = supplementaryDataFrame.Columns[supplemetaryJoinColumnNames[colNameIndex]]; + //Find occurrenses on current step (join column) var newOccurrences = shrinkedRetainedColumn.GetGroupedOccurrences(supplementaryColumn, out HashSet supplementaryColumnNullIndices); + //Convert indices from in key from local (shrinked row) to indices in original dataframe + if (retainedIndicesReverseMapping != null) + newOccurrences = newOccurrences.ToDictionary(kvp => retainedIndicesReverseMapping[kvp.Key], kvp => kvp.Value); + supplementaryJoinColumnsNullIndices.UnionWith(supplementaryColumnNullIndices); // shrink join result on current column by previouse join columns (if any) - if (rowOccurrences != null) + // (we have to remove occurrences that doesn't exist in previouse columns, because JOIN happens only if ALL left and right columns in JOIN are matched) + if (occurrences != null) { var shrinkedOccurences = new Dictionary>(); foreach (var kvp in newOccurrences) { - var newValue = kvp.Value.Where(i => rowOccurrences[retainedIndicesReverseMapping[kvp.Key]].Contains(i)).ToArray(); + var newValue = kvp.Value.Where(i => occurrences[kvp.Key].Contains(i)).ToArray(); if (newValue.Any()) { shrinkedOccurences.Add(kvp.Key, newValue); @@ -230,11 +239,9 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple } newOccurrences = shrinkedOccurences; } - rowOccurrences = newOccurrences; - } - //Restore occurences - occurrences = rowOccurrences.ToDictionary(kvp => retainedIndicesReverseMapping == null ? kvp.Key : retainedIndicesReverseMapping[kvp.Key], kvp => kvp.Value); + occurrences = newOccurrences; + } retainedRowIndices = new Int64DataFrameColumn("RetainedIndices"); supplementaryRowIndices = new Int64DataFrameColumn("SupplementaryIndices"); diff --git a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs index 6d897a0b9f..6083416676 100644 --- a/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs +++ b/test/Microsoft.Data.Analysis.Tests/DataFrameTests.cs @@ -1970,9 +1970,9 @@ public void TestMerge_ByTwoColumns_Simple_ManyToMany_LeftJoin() left.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 3 })); var right = new DataFrame(); - right.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1 })); - right.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1 })); - right.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1 })); + right.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + right.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 0 })); + right.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 0 })); // Creates this case: /* --------------------------- @@ -1981,7 +1981,7 @@ public void TestMerge_ByTwoColumns_Simple_ManyToMany_LeftJoin() * --------------------------- * 0 1 1 | 0 1 1 * 1 1 1 | 1 1 1 - * 2 3 3 + * 2 3 3 | 2 0 0 */ /* @@ -2017,10 +2017,184 @@ public void TestMerge_ByTwoColumns_Simple_ManyToMany_LeftJoin() } [Fact] - public void TestMerge_ByThreeColumns_OneToOne_LeftJoin() + public void TestMerge_ByTwoColumns_Simple_ManyToMany_RightJoin() + { + //Test left merge by to int type columns + + //Arrange + var left = new DataFrame(); + left.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + left.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 3 })); + left.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 3 })); + + var right = new DataFrame(); + right.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + right.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 0 })); + right.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 0 })); + + // Creates this case: + /* --------------------------- + * Left | Right + * I G1 G2 | I G1 G2 + * --------------------------- + * 0 1 1 | 0 1 1 + * 1 1 1 | 1 1 1 + * 2 3 3 | 2 0 0 + */ + + /* + * Merge will result in a DataFrame like: + * IL G1 G2 IR Merged: + * ------------------------- + * 0 1 1 0 1 1 0 - 0 + * 1 1 1 0 1 1 1 - 0 + * 0 1 1 1 1 1 0 - 1 + * 1 1 1 1 1 1 1 - 1 + * 2 0 0 N - 2 + */ + + //Act + var merge = left.Merge(right, new[] { "G1", "G2" }, new[] { "G1", "G2" }, joinAlgorithm: JoinAlgorithm.Right); + + //Assert + var expectedMerged = new (int? Left, int? Right)[] { + (0, 0), + (1, 0), + (0, 1), + (1, 1), + (null, 2) + }; + + Assert.Equal(expectedMerged.Length, merge.Rows.Count); + Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); + + for (long i = 0; i < expectedMerged.Length; i++) + { + MatchRowsOnMergedDataFrame(merge, left, right, i, expectedMerged[i].Left, expectedMerged[i].Right); + } + } + + [Fact] + public void TestMerge_ByTwoColumns_Simple_ManyToMany_InnerJoin() { //Test left merge by to int type columns + //Arrange + var left = new DataFrame(); + left.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + left.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 3 })); + left.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 3 })); + + var right = new DataFrame(); + right.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + right.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 0 })); + right.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 0 })); + + // Creates this case: + /* --------------------------- + * Left | Right + * I G1 G2 | I G1 G2 + * --------------------------- + * 0 1 1 | 0 1 1 + * 1 1 1 | 1 1 1 + * 2 3 3 | 2 0 0 + */ + + /* + * Merge will result in a DataFrame like: + * IL G1 G2 IR Merged: + * ------------------------- + * 0 1 1 0 1 1 0 - 0 + * 1 1 1 0 1 1 1 - 0 + * 0 1 1 1 1 1 0 - 1 + * 1 1 1 1 1 1 1 - 1 + */ + + //Act + var merge = left.Merge(right, new[] { "G1", "G2" }, new[] { "G1", "G2" }, joinAlgorithm: JoinAlgorithm.Inner); + + //Assert + var expectedMerged = new (int? Left, int? Right)[] { + (0, 0), + (1, 0), + (0, 1), + (1, 1) + }; + + Assert.Equal(expectedMerged.Length, merge.Rows.Count); + Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); + + for (long i = 0; i < expectedMerged.Length; i++) + { + MatchRowsOnMergedDataFrame(merge, left, right, i, expectedMerged[i].Left, expectedMerged[i].Right); + } + } + + [Fact] + public void TestMerge_ByTwoColumns_Simple_ManyToMany_OuterJoin() + { + //Test left merge by to int type columns + + //Arrange + var left = new DataFrame(); + left.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + left.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 3 })); + left.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 3 })); + + var right = new DataFrame(); + right.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + right.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 0 })); + right.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 0 })); + + // Creates this case: + /* --------------------------- + * Left | Right + * I G1 G2 | I G1 G2 + * --------------------------- + * 0 1 1 | 0 1 1 + * 1 1 1 | 1 1 1 + * 2 3 3 | 2 0 0 + */ + + /* + * Merge will result in a DataFrame like: + * IL G1 G2 IR Merged: + * ------------------------- + * 0 1 1 0 1 1 0 - 0 + * 0 1 1 1 1 1 0 - 1 + * 1 1 1 0 1 1 1 - 0 + * 1 1 1 1 1 1 1 - 1 + * 2 3 3 2 - N + * 2 0 0 N - 2 + */ + + //Act + var merge = left.Merge(right, new[] { "G1", "G2" }, new[] { "G1", "G2" }, joinAlgorithm: JoinAlgorithm.FullOuter); + + //Assert + var expectedMerged = new (int? Left, int? Right)[] { + (0, 0), + (0, 1), + (1, 0), + (1, 1), + (2, null), + (null, 2) + }; + + Assert.Equal(expectedMerged.Length, merge.Rows.Count); + Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); + + for (long i = 0; i < expectedMerged.Length; i++) + { + MatchRowsOnMergedDataFrame(merge, left, right, i, expectedMerged[i].Left, expectedMerged[i].Right); + } + } + + [Fact] + public void TestMerge_ByThreeColumns_OneToOne_LeftJoin() + { + //Test merge by LEFT join of int and string columns + //Arrange var left = new DataFrame(); left.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); @@ -2072,6 +2246,62 @@ public void TestMerge_ByThreeColumns_OneToOne_LeftJoin() } } + [Fact] + public void TestMerge_ByThreeColumns_OneToOne_RightJoin() + { + //Test merge by RIGHT join of int and string columns + + //Arrange + var left = new DataFrame(); + left.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + left.Columns.Add(new Int32DataFrameColumn("G1", new[] { 1, 1, 2 })); + left.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 2, 1 })); + left.Columns.Add(new StringDataFrameColumn("G3", new[] { "A", "B", "C" })); + + var right = new DataFrame(); + right.Columns.Add(new Int32DataFrameColumn("Index", new[] { 0, 1, 2 })); + right.Columns.Add(new Int32DataFrameColumn("G1", new[] { 0, 1, 1 })); + right.Columns.Add(new Int32DataFrameColumn("G2", new[] { 1, 1, 2 })); + right.Columns.Add(new StringDataFrameColumn("G3", new[] { "Z", "Y", "B" })); + + // Creates this case: + /* ----------------------------- + * Left | Right + * I G1 G2 G3 | I G1 G2 G3 + * ------------------------------ + * 0 1 1 A | 0 0 1 Z + * 1 1 2 B | 1 1 1 Y + * 2 2 1 C | 2 1 2 B + */ + + /* + * Merge will result in a DataFrame like: + * IL G1 G2 G3 IR Merged: + * ------------------------- + * 0 0 1 Z N - 0 + * 1 1 1 Y N - 1 + * 1 1 2 B 2 1 2 B 1 - 2 + */ + + //Act + var merge = left.Merge(right, new[] { "G1", "G2", "G3" }, new[] { "G1", "G2", "G3" }, joinAlgorithm: JoinAlgorithm.Right); + + //Assert + var expectedMerged = new (int? Left, int? Right)[] { + (null, 0), + (null, 1), + (1, 2) + }; + + Assert.Equal(expectedMerged.Length, merge.Rows.Count); + Assert.Equal(merge.Columns.Count, left.Columns.Count + right.Columns.Count); + + for (long i = 0; i < expectedMerged.Length; i++) + { + MatchRowsOnMergedDataFrame(merge, left, right, i, expectedMerged[i].Left, expectedMerged[i].Right); + } + } + [Fact] public void TestMerge_Issue5778() { From 2f22cd795d8408b098edf60e23bfeb9adc08b954 Mon Sep 17 00:00:00 2001 From: Alexey Smirnov Date: Sat, 4 Sep 2021 20:10:30 +0300 Subject: [PATCH 21/21] fixed typos (#5657) --- src/Microsoft.Data.Analysis/DataFrame.Join.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Microsoft.Data.Analysis/DataFrame.Join.cs b/src/Microsoft.Data.Analysis/DataFrame.Join.cs index 2fb030044a..2af42a5566 100644 --- a/src/Microsoft.Data.Analysis/DataFrame.Join.cs +++ b/src/Microsoft.Data.Analysis/DataFrame.Join.cs @@ -192,7 +192,7 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple { DataFrameColumn shrinkedRetainedColumn = retainedDataFrame.Columns[retainedJoinColumnNames[colNameIndex]]; - //shrink retained column by row occurrences from previouse step + //shrink retained column by row occurrences from previous step if (occurrences != null) { //only rows with occurences from previose step should go for futher processing @@ -223,8 +223,8 @@ private static HashSet Merge(DataFrame retainedDataFrame, DataFrame supple supplementaryJoinColumnsNullIndices.UnionWith(supplementaryColumnNullIndices); - // shrink join result on current column by previouse join columns (if any) - // (we have to remove occurrences that doesn't exist in previouse columns, because JOIN happens only if ALL left and right columns in JOIN are matched) + // shrink join result on current column by previous join columns (if any) + // (we have to remove occurrences that doesn't exist in previous columns, because JOIN happens only if ALL left and right columns in JOIN are matched) if (occurrences != null) { var shrinkedOccurences = new Dictionary>();