Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(postgres, sqlite): support where clause in 'on conflict do update set ... WHERE <condition>' #17144

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/core/src/dialects/abstract/index.ts
Expand Up @@ -108,6 +108,7 @@ export type DialectSupports = {
inserts: {
ignoreDuplicates: string /* dialect specific words for INSERT IGNORE or DO NOTHING */;
updateOnDuplicate: boolean | string /* whether dialect supports ON DUPLICATE KEY UPDATE */;
onConflictUpdateWhere: boolean /* whether dialect supports ON CONFLICT DU UPDATE SET ... WHERE */;
onConflictDoNothing: string /* dialect specific words for ON CONFLICT DO NOTHING */;
onConflictWhere: boolean /* whether dialect supports ON CONFLICT WHERE */;
conflictFields: boolean /* whether the dialect supports specifying conflict fields or not */;
Expand Down Expand Up @@ -315,6 +316,7 @@ export abstract class AbstractDialect {
inserts: {
ignoreDuplicates: '',
updateOnDuplicate: false,
onConflictUpdateWhere: false,
onConflictDoNothing: '',
onConflictWhere: false,
conflictFields: false,
Expand Down
28 changes: 26 additions & 2 deletions packages/core/src/dialects/abstract/query-generator.js
Expand Up @@ -21,6 +21,7 @@ import { Col } from '../../expression-builders/col.js';
import { Literal } from '../../expression-builders/literal.js';
import { conformIndex } from '../../model-internals';
import { and } from '../../sequelize';
import { isString } from '../../utils/check';
import { mapFinderOptions, removeNullishValuesFromHash } from '../../utils/format';
import { joinSQLFragments } from '../../utils/join-sql-fragments';
import { isModelStatic } from '../../utils/model-utils';
Expand Down Expand Up @@ -169,6 +170,13 @@ export class AbstractQueryGenerator extends AbstractQueryGeneratorTypeScript {
throw new Error('missing dialect support for conflictWhere option');
}

if (
!isEmpty(options.onConflictUpdateWhere) &&
!this.dialect.supports.inserts.onConflictUpdateWhere
) {
throw new Error('missing dialect support for onConflictUpdateWhere option');
}

// `options.updateOnDuplicate` is the list of field names to update if a duplicate key is hit during the insert. It
// contains just the field names. This option is _usually_ explicitly set by the corresponding query-interface
// upsert function.
Expand All @@ -184,7 +192,7 @@ export class AbstractQueryGenerator extends AbstractQueryGeneratorTypeScript {
const fragments = ['ON CONFLICT', '(', conflictKeys.join(','), ')'];

if (!isEmpty(options.conflictWhere)) {
fragments.push(this.whereQuery(options.conflictWhere, options));
fragments.push(this.whereQuery(options.conflictWhere, { ...options }));
}

// if update keys are provided, then apply them here. if there are no updateKeys provided, then do not try to
Expand All @@ -193,6 +201,14 @@ export class AbstractQueryGenerator extends AbstractQueryGeneratorTypeScript {
fragments.push('DO NOTHING');
} else {
fragments.push('DO UPDATE SET', updateKeys.join(','));
if (
this.dialect.supports.inserts.onConflictUpdateWhere &&
options.onConflictUpdateWhere
) {
// We need to use the mainAlias here to avoid ambiguous column names in the conditional update clause between the main table and the EXCLUDED values
options.mainAlias = isString(table) ? table : table.tableName;
fragments.push(this.whereQuery(options.onConflictUpdateWhere, options));
}
}

onDuplicateKeyUpdate = ` ${joinSQLFragments(fragments)}`;
Expand Down Expand Up @@ -281,7 +297,7 @@ export class AbstractQueryGenerator extends AbstractQueryGeneratorTypeScript {
/**
* Returns an insert into command for multiple values.
*
* @param {string} tableName
* @param {TableName} tableName
* @param {object} fieldValueHashes
* @param {object} options
* @param {object} fieldMappedAttributes
Expand Down Expand Up @@ -348,6 +364,13 @@ export class AbstractQueryGenerator extends AbstractQueryGeneratorTypeScript {
whereClause = this.whereQuery(options.conflictWhere, options);
}

let conditionalUpdateClause = '';
if (this.dialect.supports.inserts.onConflictUpdateWhere && options.onConflictUpdateWhere) {
// We need to use the mainAlias here to avoid ambiguous column names in the conditional update clause between the main table and the EXCLUDED values
options.mainAlias = isString(tableName) ? tableName : tableName.tableName;
conditionalUpdateClause = this.whereQuery(options.onConflictUpdateWhere, options);
}

// The Utils.joinSQLFragments later on will join this as it handles nested arrays.
onDuplicateKeyUpdate = [
'ON CONFLICT',
Expand All @@ -357,6 +380,7 @@ export class AbstractQueryGenerator extends AbstractQueryGeneratorTypeScript {
whereClause,
'DO UPDATE SET',
updateKeys.join(','),
conditionalUpdateClause,
];
} else {
// mysql / maria
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/dialects/postgres/index.ts
Expand Up @@ -41,6 +41,7 @@ export class PostgresDialect extends AbstractDialect {
updateOnDuplicate: ' ON CONFLICT DO UPDATE SET',
conflictFields: true,
onConflictWhere: true,
onConflictUpdateWhere: true,
},
dataTypes: {
ARRAY: true,
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/dialects/sqlite/index.ts
Expand Up @@ -18,6 +18,7 @@ export class SqliteDialect extends AbstractDialect {
updateOnDuplicate: ' ON CONFLICT DO UPDATE SET',
conflictFields: true,
onConflictWhere: true,
onConflictUpdateWhere: true,
},
index: {
using: false,
Expand Down
21 changes: 20 additions & 1 deletion packages/core/src/model.d.ts
Expand Up @@ -1069,7 +1069,16 @@ export interface CreateOptions<TAttributes = any>
* dialect specific ON CONFLICT DO NOTHING / INSERT IGNORE
*/
ignoreDuplicates?: boolean;

/**
* Fields to update if row key already exists (on duplicate key update)? (only supported by MySQL,
* MariaDB, SQLite >= 3.24.0 & Postgres >= 9.5).
*/
updateOnDuplicate?: Array<keyof TAttributes>;
/**
* An optional parameter that specifies a where clause for the 'ON CONFLICT DO UPDATE SET ... WHERE' part of the query
* Only supported in Postgres >= 9.5 and SQLite >= 3.35.0
*/
onConflictUpdateWhere?: WhereOptions<TAttributes>;
/**
* Return the affected rows (only for postgres)
*/
Expand Down Expand Up @@ -1150,6 +1159,11 @@ export interface UpsertOptions<TAttributes = any>
* Only supported in Postgres >= 9.5 and SQLite >= 3.24.0
*/
conflictWhere?: WhereOptions<TAttributes>;
/**
* An optional parameter that specifies a where clause for the 'ON CONFLICT DO UPDATE SET ... WHERE' part of the query
* Only supported in Postgres >= 9.5 and SQLite >= 3.35.0
*/
onConflictUpdateWhere?: WhereOptions<TAttributes>;
/**
* Optional override for the conflict fields in the ON CONFLICT part of the query.
* Only supported in Postgres >= 9.5 and SQLite >= 3.24.0
Expand Down Expand Up @@ -1215,6 +1229,11 @@ export interface BulkCreateOptions<TAttributes = any>
* Only supported in Postgres >= 9.5 and sqlite >= 9.5
*/
conflictWhere?: WhereOptions<TAttributes>;
/**
* An optional parameter that specifies a where clause for the 'ON CONFLICT DO UPDATE SET ... WHERE' part of the query
* Only supported in Postgres >= 9.5 and SQLite >= 3.35.0
*/
onConflictUpdateWhere?: WhereOptions<TAttributes>;
/**
* Optional override for the conflict fields in the ON CONFLICT part of the query.
* Only supported in Postgres >= 9.5 and SQLite >= 3.24.0
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/model.js
Expand Up @@ -2337,6 +2337,10 @@ ${associationOwner._getAssociationDebugList()}`);
throw new Error(`${dialect} does not support the updateOnDuplicate option.`);
}

if (options.onConflictUpdateWhere && !['postgres', 'sqlite'].includes(dialect)) {
throw new Error(`${dialect} does not support the conflictUpdateWhere option.`);
}

const model = options.model;
const modelDefinition = model.modelDefinition;

Expand Down Expand Up @@ -2476,6 +2480,7 @@ ${associationOwner._getAssociationDebugList()}`);
fieldMappedAttributes[attribute.columnName] = attribute;
}

// TODO: abdou
// Map updateOnDuplicate attributes to fields
if (options.updateOnDuplicate) {
options.updateOnDuplicate = options.updateOnDuplicate.map(attrName => {
Expand Down
73 changes: 73 additions & 0 deletions packages/core/test/integration/model/bulk-create.test.js
Expand Up @@ -1213,6 +1213,79 @@ describe('Model', () => {
});
});
}

if (dialect.supports.inserts.onConflictUpdateWhere) {
describe('onConflictUpdateWhere', () => {
const options = {
conflictAttributes: ['uniqueName'],
updateOnDuplicate: ['secretValue', 'intVal'],
onConflictUpdateWhere: {
intVal: { [Op.gte]: 15 },
},
};
const initialUsers = [
{ uniqueName: 'Peter', secretValue: '11', intVal: 7 },
{ uniqueName: 'Abdou', secretValue: '25', intVal: 10 },
];

it('should ignore the onConflictUpdateWhere option if conflict is detected but where clause does not meet the conditions ', async function () {
await this.User.bulkCreate(initialUsers);
const conflictingData = [
{ uniqueName: 'Peter', secretValue: '33', intVal: 10 },
{ uniqueName: 'Abdou', secretValue: '56', intVal: 13 },
];

await this.User.bulkCreate(conflictingData, options);
const users = await this.User.findAll({ order: ['id'] });
expect(users[0].secretValue).to.equal(initialUsers[0].secretValue);
expect(users[0].intVal).to.equal(initialUsers[0].intVal);
expect(users[1].secretValue).to.equal(initialUsers[1].secretValue);
expect(users[1].intVal).to.equal(initialUsers[1].intVal);
});

it('should update the record if a conflict is detected and the where clause meets the condition', async function () {
await this.User.bulkCreate(initialUsers);
const conflictingData = [
{ uniqueName: 'Peter', secretValue: '33', intVal: 10 },
{ uniqueName: 'Abdou', secretValue: '56', intVal: 13 },
];

await this.User.bulkCreate(conflictingData, {
...options,
onConflictUpdateWhere: { intVal: { [Op.gte]: 6 } },
});
const users = await this.User.findAll({ order: ['id'] });
expect(users[0].secretValue).to.equal(conflictingData[0].secretValue);
expect(users[0].intVal).to.equal(conflictingData[0].intVal);
expect(users[1].secretValue).to.equal(conflictingData[1].secretValue);
expect(users[1].intVal).to.equal(conflictingData[1].intVal);
});
});
} else {
it('should throw an error when the onConflictUpdateWhere option is passed', async function () {
const data = [
{ uniqueName: 'Peter', secretValue: '42' },
{ uniqueName: 'Paul', secretValue: '23' },
];

await this.User.bulkCreate(data);
data.push({ uniqueName: 'Michael', secretValue: '26' });

try {
await this.User.bulkCreate(data, {
conflictAttributes: ['uniqueName'],
updateOnDuplicate: ['secretValue', 'intVal'],
onConflictUpdateWhere: {
intVal: { [Op.gte]: 15 },
},
});
} catch (error) {
expect(error.message).to.equal(
`${dialectName} does not support the onConflictUpdateWhere option.`,
);
}
});
}
}
});
}
Expand Down
58 changes: 57 additions & 1 deletion packages/core/test/integration/model/upsert.test.js
Expand Up @@ -3,7 +3,7 @@
const { expect } = require('chai');
const sinon = require('sinon');
const { beforeEach2, sequelize } = require('../support');
const { DataTypes, Sequelize, sql } = require('@sequelize/core');
const { DataTypes, Op, Sequelize, sql } = require('@sequelize/core');

const dialectName = sequelize.dialect.name;

Expand Down Expand Up @@ -1047,6 +1047,62 @@ describe('Model', () => {
});
});
}

if (sequelize.dialect.supports.inserts.onConflictUpdateWhere) {
describe('conflictUpdateWhere', () => {
const initalUser = { name: 'Abdou', intVal: 10, uniqueId: 'Abdou-1' };
const options = {
conflictAttributes: ['uniqueId'],
updateOnDuplicate: ['intVal'],
upsertKeys: ['uniqueId'],
onConflictUpdateWhere: {
intVal: { [Op.gte]: 15 },
},
isNewRecord: true,
};
const vars = beforeEach2(async () => {
const User = sequelize.define('users', {
name: DataTypes.STRING,
intVal: DataTypes.INTEGER,
uniqueId: { type: DataTypes.STRING, unique: true },
});

await User.sync({ force: true });
await User.create(initalUser);

return { User };
});

it('Should keep old entry when conflict if where condition is not met', async () => {
const { User } = vars;
try {
// sqlite won't throw an error
await User.create({ name: 'Abdou', intVal: 20, uniqueId: 'Abdou-1' }, options);
} catch (error) {
// postgres will throw a unique constraint error when on conflict update where condition is not met
expect(error).to.be.an.instanceOf(Sequelize.EmptyResultError);
}

const user = await User.findOne({ where: { uniqueId: 'Abdou-1' } });
expect(user.intVal).to.eq(10);
});

it('Should update exisiting record on conflict if where condition is met', async () => {
const { User } = vars;
const user = await User.create(
{ name: 'Abdou', intVal: 20, uniqueId: 'Abdou-1' },
{
...options,
onConflictUpdateWhere: {
intVal: { [Op.gte]: 9 },
},
},
);

expect(user.intVal).to.eq(20);
});
});
}
});
}
});
Expand Up @@ -358,6 +358,23 @@ if (dialect.startsWith('postgres')) {
bind: { sequelize_1: 'foo' },
},
},
{
arguments: [
'myTable',
{ name: 'foo' },
{},
{
updateOnDuplicate: ['name'],
upsertKeys: ['name'],
onConflictUpdateWhere: { id: 10 },
},
],
expectation: {
query:
'INSERT INTO "myTable" ("name") VALUES ($sequelize_1) ON CONFLICT ("name") DO UPDATE SET "name"=EXCLUDED."name" WHERE "myTable"."id" = 10;',
bind: { sequelize_1: 'foo' },
},
},
{
arguments: ['myTable', { name: 'foo' }, {}, { returning: true }],
expectation: {
Expand Down Expand Up @@ -705,6 +722,19 @@ if (dialect.startsWith('postgres')) {
expectation:
'INSERT INTO "mySchema"."myTable" ("name") VALUES (\'foo\'),(\'bar\') ON CONFLICT ("name") DO UPDATE SET "name"=EXCLUDED."name";',
},
{
arguments: [
{ schema: 'mySchema', tableName: 'myTable' },
[{ name: 'foo' }, { name: 'bar' }],
{
updateOnDuplicate: ['name'],
upsertKeys: ['name'],
onConflictUpdateWhere: { id: 10 },
},
],
expectation:
'INSERT INTO "mySchema"."myTable" ("name") VALUES (\'foo\'),(\'bar\') ON CONFLICT ("name") DO UPDATE SET "name"=EXCLUDED."name" WHERE "myTable"."id" = 10;',
},

// Variants when quoteIdentifiers is false
{
Expand Down