403 Stimmen

Wie kann UPSERT (MERGE, INSERT ... ON DUPLICATE UPDATE) in PostgreSQL durchgeführt werden?

Eine sehr häufig gestellte Frage hier ist, wie man ein Upsert durchführt, was MySQL als INSERT ... ON DUPLICATE UPDATE bezeichnet und was der Standard als Teil der MERGE-Operation unterstützt.

Da PostgreSQL es nicht direkt unterstützt (vor pg 9.5), wie macht man das? Betrachten Sie das Folgende:

CREATE TABLE testtable (
    id integer PRIMARY KEY,
    somedata text NOT NULL
);

INSERT INTO testtable (id, somedata) VALUES
(1, 'fred'),
(2, 'bob');

Stellen Sie sich nun vor, Sie möchten die Tupel (2, 'Joe'), (3, 'Alan') "upserten", sodass der neue Tabelleninhalt wäre:

(1, 'fred'),
(2, 'Joe'),    -- Wert des vorhandenen Tupels geändert
(3, 'Alan')    -- Neues Tupel hinzugefügt

Das ist es, worüber die Leute sprechen, wenn sie über ein upsert diskutieren. Wesentlich ist, dass jeder Ansatz sicher ist in Anbetracht mehrerer Transaktionen, die an der gleichen Tabelle arbeiten - entweder durch die Verwendung von expliziten Sperren oder anderweitiges Verteidigen gegen die resultierenden Rennbedingungen.

Dieses Thema wird ausführlich diskutiert unter Insert, on duplicate update in PostgreSQL?, aber das ist über Alternativen zur MySQL-Syntax und hat im Laufe der Zeit eine ziemliche Menge an nicht verwandten Details angesammelt. Ich arbeite an abschließenden Antworten.

Diese Techniken sind auch nützlich für "einfügen, wenn nicht vorhanden, ansonsten nichts tun", d.h. "insert ... on duplicate key ignore".

1voto

reubano Punkte 4501

Seit diese Frage geschlossen wurde, poste ich hier, wie man es mit SQLAlchemy macht. Über Rekursion versucht es, einen Bulk-Insert oder -Update gegen Rennbedingungen und Validierungsfehler zu bekämpfen.

Zuerst die Imports

import itertools as it

from functools import partial
from operator import itemgetter

from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts

Jetzt ein paar Hilfsfunktionen

def chunk(content, chunksize=None):
    """Gruppiert Daten in Chunks, jeder mit (maximal) `chunksize` Elementen.
    https://stackoverflow.com/a/22919323/408556
    """
    if chunksize:
        i = iter(content)
        generator = (list(it.islice(i, chunksize)) for _ in it.count())
    else:
        generator = iter([content])

    return it.takewhile(bool, generator)

def gen_resources(records):
    """Gibt ein Dictionary aus, wenn die id des Datensatzes bereits existiert, andernfalls ein Zeilenobjekt.
    """
    ids = {item[0] for item in session.query(Posts.id)}

    for record in records:
        is_row = hasattr(record, 'to_dict')

        if is_row and record.id in ids:
            # Es ist eine Zeile, aber die id existiert bereits, also müssen wir sie
            # in ein Dictionary konvertieren, das den bestehenden Datensatz aktualisiert. Da es ein Duplikat ist,
            # auch True ausgeben
            yield record.to_dict(), True
        elif is_row:
            # Es ist eine Zeile und die id existiert nicht, also ist keine Konvertierung erforderlich.
            # Da es kein Duplikat ist, auch False ausgeben
            yield record, False
        elif record['id'] in ids:
            # Es ist ein Dictionary und die id existiert bereits, also ist keine Konvertierung erforderlich.
            # Da es ein Duplikat ist, auch True ausgeben
            yield record, True
        else:
            # Es ist ein Dictionary und die id existiert nicht, also müssen wir es konvertieren.
            # Da es kein Duplikat ist, auch False ausgeben
            yield Posts(**record), False

Und schließlich die upsert Funktion

def upsert(data, chunksize=None):
    for records in chunk(data, chunksize):
        resources = gen_resources(records)
        sorted_resources = sorted(resources, key=itemgetter(1))

        for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
            items = [g[0] for g in group]

            if dupe:
                _upsert = partial(session.bulk_update_mappings, Posts)
            else:
                _upsert = session.add_all

            try:
                _upsert(items)
                session.commit()
            except IntegrityError:
                # Ein Datensatz wurde nach unserer Prüfung hinzugefügt oder gelöscht, also erneut versuchen
                # 
                # entsprechend anpassen, indem zusätzliche Ausnahmen hinzugefügt werden, z.B.,
                # except (IntegrityError, ValidationError, ValueError)
                db.session.rollback()
                upsert(items)
            except Exception as e:
                # Ein anderer Fehler ist aufgetreten, also die Chunkgröße reduzieren, um die
                # fehlerhafte(n) Zeile(n) zu isolieren
                db.session.rollback()
                num_items = len(items)

                if num_items > 1:
                    upsert(items, num_items // 2)
                else:
                    print('Fehler beim Hinzufügen des Datensatzes {}'.format(items[0]))

So wird es benutzt

>>> data = [
...     {'id': 1, 'text': 'aktuallisierter post1'}, 
...     {'id': 5, 'text': 'aktuallisierter post5'}, 
...     {'id': 1000, 'text': 'neuer post1000'}]
... 
>>> upsert(data)

Der Vorteil gegenüber bulk_save_objects ist, dass es Beziehungen, Fehlerüberprüfung usw. beim Einfügen behandeln kann (im Gegensatz zu Bulk-Operationen).

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X