144 Stimmen

Wie führe ich ein Upsert mit SqlAlchemy durch?

Ich habe einen Datensatz, von dem ich möchte, dass er in der Datenbank vorhanden ist, wenn er nicht vorhanden ist, und wenn er bereits vorhanden ist (Primärschlüssel vorhanden), möchte ich, dass die Felder auf den aktuellen Stand aktualisiert werden. Dies wird oft als upsert .

Der folgende unvollständige Codeschnipsel zeigt, wie es geht, aber er scheint zu klobig zu sein (vor allem, wenn es viel mehr Spalten gäbe). Was ist der bessere/beste Weg?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

Gibt es einen besseren oder weniger ausführlichen Weg, dies zu tun? Etwas in dieser Art wäre großartig:

sess.upsert_this(desired_default, unique_key = "name")

obwohl die unique_key kwarg ist offensichtlich unnötig (der ORM sollte in der Lage sein, dies leicht herauszufinden). Ich habe es nur hinzugefügt, weil SQLAlchemy dazu neigt, nur mit dem Primärschlüssel zu arbeiten. z.B.: Ich habe untersucht, ob Session.merge wäre anwendbar, aber das funktioniert nur mit dem Primärschlüssel, der in diesem Fall eine automatisch inkrementierende ID ist, die für diesen Zweck nicht sehr nützlich ist.

Ein Beispiel für einen Anwendungsfall ist das Starten einer Serveranwendung, die ihre standardmäßig erwarteten Daten aktualisiert hat, d. h. es gibt keine Gleichzeitigkeitsprobleme bei dieser Aktualisierung.

85voto

P.R. Punkte 3347

SQLAlchemy unterstützt ON CONFLICT mit zwei Methoden on_conflict_do_update() y on_conflict_do_nothing() .

Kopieren von 文書化 :

from sqlalchemy.dialects.postgresql import insert

stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
stmt = stmt.on_conflict_do_update(
    index_elements=[my_table.c.user_email],
    index_where=my_table.c.user_email.like('%@gmail.com'),
    set_=dict(data=stmt.excluded.data)
)
conn.execute(stmt)

80voto

wberry Punkte 17277

SQLAlchemy hat ein "Speichern-oder-Aktualisieren"-Verhalten, das in neueren Versionen in session.add aber vorher war die separate session.saveorupdate anrufen. Dies ist kein "Upsert", kann aber für Ihre Bedürfnisse ausreichend sein.

Es ist gut, dass Sie nach einer Klasse mit mehreren eindeutigen Schlüsseln fragen; ich glaube, das ist genau der Grund dafür, dass es keinen einzigen richtigen Weg gibt, dies zu tun. Der Primärschlüssel ist auch ein eindeutiger Schlüssel. Wenn es keine eindeutigen Beschränkungen gäbe, sondern nur den Primärschlüssel, wäre das Problem einfach genug: Wenn nichts mit der angegebenen ID existiert oder die ID gleich Null ist, erstellen Sie einen neuen Datensatz; andernfalls aktualisieren Sie alle anderen Felder im vorhandenen Datensatz mit diesem Primärschlüssel.

Wenn es jedoch zusätzliche eindeutige Beschränkungen gibt, ergeben sich logische Probleme mit diesem einfachen Ansatz. Wenn Sie ein Objekt "einfügen" wollen und der Primärschlüssel Ihres Objekts mit einem vorhandenen Datensatz übereinstimmt, aber eine andere eindeutige Spalte mit einer verschiedene aufzeichnen, was tun Sie dann? Ähnlich verhält es sich, wenn der Primärschlüssel keinem vorhandenen Datensatz entspricht, aber eine andere eindeutige Spalte tut mit einem bestehenden Datensatz übereinstimmen, was dann? Vielleicht gibt es eine richtige Antwort für Ihre spezielle Situation, aber im Allgemeinen würde ich sagen, dass es keine einzig richtige Antwort gibt.

Das wäre der Grund, warum es keinen eingebauten "Upsert"-Vorgang gibt. Die Anwendung muss definieren, was dies in jedem einzelnen Fall bedeutet.

35voto

NirIzr Punkte 2821

Heutzutage bietet SQLAlchemy zwei hilfreiche Funktionen on_conflict_do_nothing y on_conflict_do_update . Diese Funktionen sind nützlich, erfordern aber, dass Sie von der ORM-Schnittstelle auf die darunter liegende umschalten. SQLAlchemy-Kern .

Obwohl diese beiden Funktionen das Upserting mit der SQLAlchemy-Syntax nicht so schwierig machen, sind diese Funktionen weit davon entfernt, eine vollständige Out-of-the-Box-Lösung für das Upserting zu bieten.

Mein häufiger Anwendungsfall ist das Upserting einer großen Anzahl von Zeilen in einer einzigen SQL-Abfrage/Sitzungsausführung. Beim Upserting stoße ich normalerweise auf zwei Probleme:

So fehlen zum Beispiel ORM-Funktionen auf höherer Ebene, an die wir uns gewöhnt haben. Sie können keine ORM-Objekte verwenden, sondern müssen stattdessen ForeignKey s zum Zeitpunkt der Einfügung.

Ich benutze este Die folgende Funktion habe ich geschrieben, um diese beiden Probleme zu lösen:

def upsert(session, model, rows):
    table = model.__table__
    stmt = postgresql.insert(table)
    primary_keys = [key.name for key in inspect(table).primary_key]
    update_dict = {c.name: c for c in stmt.excluded if not c.primary_key}

    if not update_dict:
        raise ValueError("insert_or_update resulted in an empty update_dict")

    stmt = stmt.on_conflict_do_update(index_elements=primary_keys,
                                      set_=update_dict)

    seen = set()
    foreign_keys = {col.name: list(col.foreign_keys)[0].column for col in table.columns if col.foreign_keys}
    unique_constraints = [c for c in table.constraints if isinstance(c, UniqueConstraint)]
    def handle_foreignkeys_constraints(row):
        for c_name, c_value in foreign_keys.items():
            foreign_obj = row.pop(c_value.table.name, None)
            row[c_name] = getattr(foreign_obj, c_value.name) if foreign_obj else None

        for const in unique_constraints:
            unique = tuple([const,] + [row[col.name] for col in const.columns])
            if unique in seen:
                return None
            seen.add(unique)

        return row

    rows = list(filter(None, (handle_foreignkeys_constraints(row) for row in rows)))
    session.execute(stmt, rows)

14voto

Ben Punkte 5323

Ich verfolge den Ansatz "erst schauen, dann springen":

# first get the object from the database if it exists
# we're guaranteed to only get one or zero results
# because we're filtering by primary key
switch_command = session.query(Switch_Command).\
    filter(Switch_Command.switch_id == switch.id).\
    filter(Switch_Command.command_id == command.id).first()

# If we didn't get anything, make one
if not switch_command:
    switch_command = Switch_Command(switch_id=switch.id, command_id=command.id)

# update the stuff we care about
switch_command.output = 'Hooray!'
switch_command.lastseen = datetime.datetime.utcnow()

session.add(switch_command)
# This will generate either an INSERT or UPDATE
# depending on whether we have a new object or not
session.commit()

Der Vorteil ist, dass dies db-neutral ist und ich denke, es ist klar zu lesen. Der Nachteil ist, dass es ein potenzielles Rennbedingung in einem Szenario wie dem folgenden:

  • fragen wir die Datenbank nach einem switch_command und keine finden
  • erstellen wir eine switch_command
  • Ein anderer Prozess oder Thread erstellt eine switch_command mit demselben Primärschlüssel wie dem unseren
  • versuchen wir, unsere switch_command

4voto

Mikko Ohtamaa Punkte 75407

Es gibt mehrere Antworten und hier kommt eine weitere Antwort (YAA). Andere Antworten sind aufgrund der beteiligten Metaprogrammierung nicht so gut lesbar. Hier ist ein Beispiel, das

  • Verwendet SQLAlchemy ORM

  • Zeigt, wie man eine Zeile erstellt, wenn es keine Zeilen gibt, indem man on_conflict_do_nothing

  • Zeigt, wie man die vorhandene Zeile (falls vorhanden) aktualisiert, ohne eine neue Zeile zu erstellen, indem man on_conflict_do_update

  • Verwendet den Primärschlüssel der Tabelle als constraint

Ein längeres Beispiel in die ursprüngliche Frage, worauf sich dieser Code bezieht .

import sqlalchemy as sa
import sqlalchemy.orm as orm
from sqlalchemy import text
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

class PairState(Base):

    __tablename__ = "pair_state"

    # This table has 1-to-1 relationship with Pair
    pair_id = sa.Column(sa.ForeignKey("pair.id"), nullable=False, primary_key=True, unique=True)
    pair = orm.relationship(Pair,
                        backref=orm.backref("pair_state",
                                        lazy="dynamic",
                                        cascade="all, delete-orphan",
                                        single_parent=True, ), )

    # First raw event in data stream
    first_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # Last raw event in data stream
    last_event_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    # The last hypertable entry added
    last_interval_at = sa.Column(sa.TIMESTAMP(timezone=True), nullable=False, server_default=text("TO_TIMESTAMP(0)"))

    @staticmethod
    def create_first_event_if_not_exist(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Sets the first event value if not exist yet."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, first_event_at=ts).
            on_conflict_do_nothing()
        )

    @staticmethod
    def update_last_event(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_event_at for a named pair."""
        # Based on the original example of https://stackoverflow.com/a/49917004/315168
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_event_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_event_at": ts})
        )

    @staticmethod
    def update_last_interval(dbsession: Session, pair_id: int, ts: datetime.datetime):
        """Replaces the the column last_interval_at for a named pair."""
        dbsession.execute(
            insert(PairState).
            values(pair_id=pair_id, last_interval_at=ts).
            on_conflict_do_update(constraint=PairState.__table__.primary_key, set_={"last_interval_at": ts})
        )

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X