From 6668a62eea6a70b79379d0c1bebcc6718f106043 Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Thu, 29 Oct 2020 17:24:40 -0700 Subject: [PATCH 1/2] Fix Bulk Copy Async deadlock with custom IDataReader using SqlDataReader internally. Fixes #755 --- .../src/Microsoft/Data/SqlClient/SqlBulkCopy.cs | 10 ++++++++++ .../src/Microsoft/Data/SqlClient/SqlBulkCopy.cs | 14 ++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs index 8b8f689d0c..fd5778c28e 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs @@ -1159,6 +1159,11 @@ private Task ReadFromRowSourceAsync(CancellationToken cts) } else { // This will call Read for DataRows, DataTable and IDataReader (this includes all IDataReader except DbDataReader) + // Release lock to prevent possible deadlocks + SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection(); + bool semaphoreLock = internalConnection._parserLock.CanBeReleasedFromAnyThread; + internalConnection._parserLock.Release(); + _hasMoreRowToCopy = false; try { @@ -1175,6 +1180,11 @@ private Task ReadFromRowSourceAsync(CancellationToken cts) throw; } } + finally + { + _insideRowsCopiedEvent = false; + internalConnection._parserLock.Wait(canReleaseFromAnyThread: semaphoreLock); + } return null; } } diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs index fdfbc1a71b..171834985d 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs @@ -1261,7 +1261,7 @@ private Task ReadFromRowSourceAsync(CancellationToken cts) { if (_isAsyncBulkCopy && (_DbDataReaderRowSource != null)) { - //This will call ReadAsync for DbDataReader (for SqlDataReader it will be truely async read; for non-SqlDataReader it may block.) + // This will call ReadAsync for DbDataReader (for SqlDataReader it will be truely async read; for non-SqlDataReader it may block.) return _DbDataReaderRowSource.ReadAsync(cts).ContinueWith((t) => { if (t.Status == TaskStatus.RanToCompletion) @@ -1272,7 +1272,12 @@ private Task ReadFromRowSourceAsync(CancellationToken cts) }, TaskScheduler.Default).Unwrap(); } else - { //this will call Read for DataRows, DataTable and IDataReader (this includes all IDataReader except DbDataReader) + { // This will call Read for DataRows, DataTable and IDataReader (this includes all IDataReader except DbDataReader) + // Release lock to prevent possible deadlocks + SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection(); + bool semaphoreLock = internalConnection._parserLock.CanBeReleasedFromAnyThread; + internalConnection._parserLock.Release(); + _hasMoreRowToCopy = false; try { @@ -1291,6 +1296,11 @@ private Task ReadFromRowSourceAsync(CancellationToken cts) throw; } } + finally + { + _insideRowsCopiedEvent = false; + internalConnection._parserLock.Wait(canReleaseFromAnyThread: semaphoreLock); + } return null; } } From d406009438bfca6ca862e5ea600a8a0ca73affc8 Mon Sep 17 00:00:00 2001 From: Cheena Malhotra Date: Fri, 30 Oct 2020 13:39:57 -0700 Subject: [PATCH 2/2] Revert unwanted flag change --- .../Microsoft/Data/SqlClient/SqlBulkCopy.cs | 1 - .../Microsoft/Data/SqlClient/SqlBulkCopy.cs | 77 +++++++++---------- 2 files changed, 38 insertions(+), 40 deletions(-) diff --git a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs index fd5778c28e..344d517618 100644 --- a/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs +++ b/src/Microsoft.Data.SqlClient/netcore/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs @@ -1182,7 +1182,6 @@ private Task ReadFromRowSourceAsync(CancellationToken cts) } finally { - _insideRowsCopiedEvent = false; internalConnection._parserLock.Wait(canReleaseFromAnyThread: semaphoreLock); } return null; diff --git a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs index 171834985d..f95a76348d 100644 --- a/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs +++ b/src/Microsoft.Data.SqlClient/netfx/src/Microsoft/Data/SqlClient/SqlBulkCopy.cs @@ -186,7 +186,7 @@ private enum ValueSourceType DbDataReader } - // Enum for specifying SqlDataReader.Get method used + // Enum for specifying SqlDataReader.Get method used private enum ValueMethod : byte { GetValue, @@ -309,7 +309,7 @@ private int RowNumber // for debug purpose only. // TODO: I will make this internal to use Reflection. #if DEBUG - internal static bool _setAlwaysTaskOnWrite = false; //when set and in DEBUG mode, TdsParser::WriteBulkCopyValue will always return a task + internal static bool _setAlwaysTaskOnWrite = false; //when set and in DEBUG mode, TdsParser::WriteBulkCopyValue will always return a task internal static bool SetAlwaysTaskOnWrite { set @@ -541,8 +541,8 @@ private bool IsCopyOption(SqlBulkCopyOptions copyOption) return (_copyOptions & copyOption) == copyOption; } - //Creates the initial query string, but does not execute it. - // + //Creates the initial query string, but does not execute it. + // private string CreateInitialQuery() { string[] parts; @@ -563,7 +563,7 @@ private string CreateInitialQuery() TDSCommand = "select @@trancount; SET FMTONLY ON select * from " + ADP.BuildMultiPartName(parts) + " SET FMTONLY OFF "; if (_connection.IsShiloh) { - // If its a temp DB then try to connect + // If its a temp DB then try to connect string TableCollationsStoredProc; if (_connection.IsKatmaiOrNewer) @@ -626,9 +626,9 @@ private string CreateInitialQuery() } // Creates and then executes initial query to get information about the targettable - // When __isAsyncBulkCopy == false (i.e. it is Sync copy): out result contains the resulset. Returns null. - // When __isAsyncBulkCopy == true (i.e. it is Async copy): This still uses the _parser.Run method synchronously and return Task. - // We need to have a _parser.RunAsync to make it real async. + // When __isAsyncBulkCopy == false (i.e. it is Sync copy): out result contains the resulset. Returns null. + // When __isAsyncBulkCopy == true (i.e. it is Async copy): This still uses the _parser.Run method synchronously and return Task. + // We need to have a _parser.RunAsync to make it real async. private Task CreateAndExecuteInitialQueryAsync(out BulkCopySimpleResultSet result) { string TDSCommand = CreateInitialQuery(); @@ -1089,7 +1089,7 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b return new TextDataFeed(_DbDataReaderRowSource.GetTextReader(sourceOrdinal)); case ValueMethod.DataFeedXml: // Only SqlDataReader supports an XmlReader - // There is no GetXmlReader on DbDataReader, however if GetValue returns XmlReader we will read it as stream if it is assigned to XML field + // There is no GetXmlReader on DbDataReader, however if GetValue returns XmlReader we will read it as stream if it is assigned to XML field Debug.Assert(_SqlDataReaderRowSource != null, "Should not be reading row as an XmlReader if bulk copy source is not a SqlDataReader"); return new XmlDataFeed(_SqlDataReaderRowSource.GetXmlReader(sourceOrdinal)); default: @@ -1148,7 +1148,7 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b { Debug.Assert(!(value is INullable) || !((INullable)value).IsNull, "IsDBNull returned false, but GetValue returned a null INullable"); } -#endif +#endif return value; } } @@ -1242,7 +1242,7 @@ private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out b } } - // If we are here then either the value is null, there was no special storage type for this column or the special storage type wasn't handled (e.g. if the currentRowValue is NaN) + // If we are here then either the value is null, there was no special storage type for this column or the special storage type wasn't handled (e.g. if the currentRowValue is NaN) return currentRowValue; } default: @@ -1261,7 +1261,7 @@ private Task ReadFromRowSourceAsync(CancellationToken cts) { if (_isAsyncBulkCopy && (_DbDataReaderRowSource != null)) { - // This will call ReadAsync for DbDataReader (for SqlDataReader it will be truely async read; for non-SqlDataReader it may block.) + // This will call ReadAsync for DbDataReader (for SqlDataReader it will be truely async read; for non-SqlDataReader it may block.) return _DbDataReaderRowSource.ReadAsync(cts).ContinueWith((t) => { if (t.Status == TaskStatus.RanToCompletion) @@ -1298,7 +1298,6 @@ private Task ReadFromRowSourceAsync(CancellationToken cts) } finally { - _insideRowsCopiedEvent = false; internalConnection._parserLock.Wait(canReleaseFromAnyThread: semaphoreLock); } return null; @@ -1384,7 +1383,7 @@ private SourceColumnMetadata GetColumnMetadata(int ordinal) else if (typeof(SqlSingle) == t || typeof(float) == t) { isSqlType = true; - method = ValueMethod.SqlTypeSqlSingle; // Source Type SqlSingle + method = ValueMethod.SqlTypeSqlSingle; // Source Type SqlSingle } else { @@ -1620,7 +1619,7 @@ private object ValidateBulkCopyVariant(object value) case TdsEnums.SQLDATETIME2: case TdsEnums.SQLDATETIMEOFFSET: if (value is INullable) - { // Current limitation in the SqlBulkCopy Variant code limits BulkCopy to CLR/COM Types. + { // Current limitation in the SqlBulkCopy Variant code limits BulkCopy to CLR/COM Types. return MetaType.GetComValueFromSqlVariant(value); } else @@ -1719,7 +1718,7 @@ private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, re // Perf: It is more efficient to write a SqlDecimal than a decimal since we need to break it into its 'bits' when writing value = sqlValue; isSqlType = true; - typeChanged = false; // Setting this to false as SqlParameter.CoerceValue will only set it to true when converting to a CLR type + typeChanged = false; // Setting this to false as SqlParameter.CoerceValue will only set it to true when converting to a CLR type break; case TdsEnums.SQLINTN: @@ -1789,7 +1788,7 @@ private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, re } break; case TdsEnums.SQLXMLTYPE: - // Could be either string, SqlCachedBuffer, XmlReader or XmlDataFeed + // Could be either string, SqlCachedBuffer, XmlReader or XmlDataFeed Debug.Assert((value is XmlReader) || (value is SqlCachedBuffer) || (value is string) || (value is SqlString) || (value is XmlDataFeed), "Invalid value type of Xml datatype"); if (value is XmlReader) { @@ -1852,7 +1851,7 @@ public void WriteToServer(DbDataReader reader) _dataTableSource = null; _rowSourceType = ValueSourceType.DbDataReader; _isAsyncBulkCopy = false; - WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false; + WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false; } finally { @@ -1889,7 +1888,7 @@ public void WriteToServer(IDataReader reader) _dataTableSource = null; _rowSourceType = ValueSourceType.IDataReader; _isAsyncBulkCopy = false; - WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false; + WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false; } finally { @@ -1930,7 +1929,7 @@ public void WriteToServer(DataTable table, DataRowState rowState) _rowEnumerator = table.Rows.GetEnumerator(); _isAsyncBulkCopy = false; - WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false; + WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false; } finally { @@ -1974,7 +1973,7 @@ public void WriteToServer(DataRow[] rows) _rowEnumerator = rows.GetEnumerator(); _isAsyncBulkCopy = false; - WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false; + WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false; } finally { @@ -2034,7 +2033,7 @@ public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationTok _rowSourceType = ValueSourceType.RowArray; _rowEnumerator = rows.GetEnumerator(); _isAsyncBulkCopy = true; - resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; + resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; } finally { @@ -2075,7 +2074,7 @@ public Task WriteToServerAsync(DbDataReader reader, CancellationToken cancellati _dataTableSource = null; _rowSourceType = ValueSourceType.DbDataReader; _isAsyncBulkCopy = true; - resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; + resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; } finally { @@ -2116,7 +2115,7 @@ public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellatio _dataTableSource = null; _rowSourceType = ValueSourceType.IDataReader; _isAsyncBulkCopy = true; - resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; + resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; } finally { @@ -2170,7 +2169,7 @@ public Task WriteToServerAsync(DataTable table, DataRowState rowState, Cancellat _rowSourceType = ValueSourceType.DataTable; _rowEnumerator = table.Rows.GetEnumerator(); _isAsyncBulkCopy = true; - resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; + resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true; } finally { @@ -2179,7 +2178,7 @@ public Task WriteToServerAsync(DataTable table, DataRowState rowState, Cancellat return resultTask; } - // Writes row source. + // Writes row source. // private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctoken) { @@ -2437,9 +2436,9 @@ private bool FireRowsCopiedEvent(long rowsCopied) return eventArgs.Abort; } - // Reads a cell and then writes it. + // Reads a cell and then writes it. // Read may block at this moment since there is no getValueAsync or DownStream async at this moment. - // When _isAsyncBulkCopy == true: Write will return Task (when async method runs asynchronously) or Null (when async call actually ran synchronously) for performance. + // When _isAsyncBulkCopy == true: Write will return Task (when async method runs asynchronously) or Null (when async call actually ran synchronously) for performance. // When _isAsyncBulkCopy == false: Writes are purely sync. This method reutrn null at the end. // private Task ReadWriteColumnValueAsync(int col) @@ -2573,7 +2572,7 @@ private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource sour } - // The notification logic. + // The notification logic. // private void CheckAndRaiseNotification() { @@ -2646,11 +2645,11 @@ private void CheckAndRaiseNotification() Debug.Assert(writeTask == null, "Task should not pend while doing sync bulk copy"); RunParser(); AbortTransaction(); - throw exception; //this will be caught and put inside the Task's exception. + throw exception; //this will be caught and put inside the Task's exception. } } - // Checks for cancellation. If cancel requested, cancels the task and returns the cancelled task + // Checks for cancellation. If cancel requested, cancels the task and returns the cancelled task Task CheckForCancellation(CancellationToken cts, TaskCompletionSource tcs) { if (cts.IsCancellationRequested) @@ -2698,7 +2697,7 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, int i; try { - //totalRows is batchsize which is 0 by default. In that case, we keep copying till the end (until _hasMoreRowToCopy == false). + //totalRows is batchsize which is 0 by default. In that case, we keep copying till the end (until _hasMoreRowToCopy == false). for (i = rowsSoFar; (totalRows <= 0 || i < totalRows) && _hasMoreRowToCopy == true; i++) { if (_isAsyncBulkCopy == true) @@ -2715,10 +2714,10 @@ private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, task = CopyColumnsAsync(0); //copy 1 row if (task == null) - { //tsk is done. + { //tsk is done. CheckAndRaiseNotification(); //check notification logic after copying the row - //now we will read the next row. + //now we will read the next row. Task readTask = ReadFromRowSourceAsync(cts); // read the next row. Caution: more is only valid if the task returns null. Otherwise, we wait for Task.Result if (readTask != null) { @@ -3056,7 +3055,7 @@ private void CleanUpStateObject(bool isCancelRequested = true) // The continuation part of WriteToServerInternalRest. Executes when the initial query task is completed. (see, WriteToServerInternalRest). // It carries on the source which is passed from the WriteToServerInternalRest and performs SetResult when the entire copy is done. // The carried on source may be null in case of Sync copy. So no need to SetResult at that time. - // It launches the copy operation. + // It launches the copy operation. // private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet internalResults, CancellationToken cts, TaskCompletionSource source) { @@ -3091,7 +3090,7 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int } AsyncHelper.ContinueTask(task, source, () => { - //Bulk copy task is completed at this moment. + //Bulk copy task is completed at this moment. //Todo: The cases may be combined for code reuse. if (task.IsCanceled) { @@ -3177,7 +3176,7 @@ private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet int } } - // Rest of the WriteToServerInternalAsync method. + // Rest of the WriteToServerInternalAsync method. // It carries on the source from its caller WriteToServerInternal. // source is null in case of Sync bcp. But valid in case of Async bcp. // It calls the WriteToServerInternalRestContinuedAsync as a continuation of the initial query task. @@ -3223,7 +3222,7 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio regReconnectCancel = cts.Register(() => cancellableReconnectTS.TrySetCanceled()); } AsyncHelper.ContinueTask(reconnectTask, cancellableReconnectTS, () => { cancellableReconnectTS.SetResult(null); }); - // no need to cancel timer since SqlBulkCopy creates specific task source for reconnection + // no need to cancel timer since SqlBulkCopy creates specific task source for reconnection AsyncHelper.SetTimeoutException(cancellableReconnectTS, BulkCopyTimeout, () => { return SQL.BulkLoadInvalidDestinationTable(_destinationTableName, SQL.CR_ReconnectTimeout()); }, CancellationToken.None); AsyncHelper.ContinueTask(cancellableReconnectTS.Task, source, @@ -3266,7 +3265,7 @@ private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletio _connection.AddWeakReference(this, SqlReferenceCollection.BulkCopyTag); } - internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we already have the parser lock + internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we already have the parser lock try {