144 Stimmen

Wie führe ich ein Upsert mit SqlAlchemy durch?

Ich habe einen Datensatz, von dem ich möchte, dass er in der Datenbank vorhanden ist, wenn er nicht vorhanden ist, und wenn er bereits vorhanden ist (Primärschlüssel vorhanden), möchte ich, dass die Felder auf den aktuellen Stand aktualisiert werden. Dies wird oft als upsert .

Der folgende unvollständige Codeschnipsel zeigt, wie es geht, aber er scheint zu klobig zu sein (vor allem, wenn es viel mehr Spalten gäbe). Was ist der bessere/beste Weg?

Base = declarative_base()
class Template(Base):
    __tablename__ = 'templates'
    id = Column(Integer, primary_key = True)
    name = Column(String(80), unique = True, index = True)
    template = Column(String(80), unique = True)
    description = Column(String(200))
    def __init__(self, Name, Template, Desc):
        self.name = Name
        self.template = Template
        self.description = Desc

def UpsertDefaultTemplate():
    sess = Session()
    desired_default = Template("default", "AABBCC", "This is the default template")
    try:
        q = sess.query(Template).filter_by(name = desiredDefault.name)
        existing_default = q.one()
    except sqlalchemy.orm.exc.NoResultFound:
        #default does not exist yet, so add it...
        sess.add(desired_default)
    else:
        #default already exists.  Make sure the values are what we want...
        assert isinstance(existing_default, Template)
        existing_default.name = desired_default.name
        existing_default.template = desired_default.template
        existing_default.description = desired_default.description
    sess.flush()

Gibt es einen besseren oder weniger ausführlichen Weg, dies zu tun? Etwas in dieser Art wäre großartig:

sess.upsert_this(desired_default, unique_key = "name")

obwohl die unique_key kwarg ist offensichtlich unnötig (der ORM sollte in der Lage sein, dies leicht herauszufinden). Ich habe es nur hinzugefügt, weil SQLAlchemy dazu neigt, nur mit dem Primärschlüssel zu arbeiten. z.B.: Ich habe untersucht, ob Session.merge wäre anwendbar, aber das funktioniert nur mit dem Primärschlüssel, der in diesem Fall eine automatisch inkrementierende ID ist, die für diesen Zweck nicht sehr nützlich ist.

Ein Beispiel für einen Anwendungsfall ist das Starten einer Serveranwendung, die ihre standardmäßig erwarteten Daten aktualisiert hat, d. h. es gibt keine Gleichzeitigkeitsprobleme bei dieser Aktualisierung.

3voto

Aditi Srivastava Punkte 111

Die unten funktioniert gut für mich mit Redshift-Datenbank und wird auch für kombinierte Primärschlüssel-Beschränkung arbeiten.

SOURCE : este

Für die Erstellung der SQLAlchemy-Engine in der Funktion sind nur wenige Änderungen erforderlich def start_engine()

from sqlalchemy import Column, Integer, Date ,Metadata
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects import postgresql

Base = declarative_base()

def start_engine():
    engine = create_engine(os.getenv('SQLALCHEMY_URI', 
    'postgresql://localhost:5432/upsert'))
     connect = engine.connect()
    meta = MetaData(bind=engine)
    meta.reflect(bind=engine)
    return engine

class DigitalSpend(Base):
    __tablename__ = 'digital_spend'
    report_date = Column(Date, nullable=False)
    day = Column(Date, nullable=False, primary_key=True)
    impressions = Column(Integer)
    conversions = Column(Integer)

    def __repr__(self):
        return str([getattr(self, c.name, None) for c in self.__table__.c])

def compile_query(query):
    compiler = query.compile if not hasattr(query, 'statement') else 
  query.statement.compile
    return compiler(dialect=postgresql.dialect())

def upsert(session, model, rows, as_of_date_col='report_date', no_update_cols=[]):
    table = model.__table__

    stmt = insert(table).values(rows)

    update_cols = [c.name for c in table.c
                   if c not in list(table.primary_key.columns)
                   and c.name not in no_update_cols]

    on_conflict_stmt = stmt.on_conflict_do_update(
        index_elements=table.primary_key.columns,
        set_={k: getattr(stmt.excluded, k) for k in update_cols},
        index_where=(getattr(model, as_of_date_col) < getattr(stmt.excluded, as_of_date_col))
        )

    print(compile_query(on_conflict_stmt))
    session.execute(on_conflict_stmt)

session = start_engine()
upsert(session, DigitalSpend, initial_rows, no_update_cols=['conversions'])

2voto

Schalton Punkte 2465

Dies ermöglicht den Zugriff auf die zugrundeliegenden Modelle auf der Grundlage von Stringnamen

def get_class_by_tablename(tablename):
  """Return class reference mapped to table.
  https://stackoverflow.com/questions/11668355/sqlalchemy-get-model-from-table-name-this-may-imply-appending-some-function-to
  :param tablename: String with name of table.
  :return: Class reference or None.
  """
  for c in Base._decl_class_registry.values():
    if hasattr(c, '__tablename__') and c.__tablename__ == tablename:
      return c

sqla_tbl = get_class_by_tablename(table_name)

def handle_upsert(record_dict, table):
    """
    handles updates when there are primary key conflicts

    """
    try:
        self.active_session().add(table(**record_dict))
    except:
        # Here we'll assume the error is caused by an integrity error
        # We do this because the error classes are passed from the
        # underlying package (pyodbc / sqllite) SQLAlchemy doesn't mask
        # them with it's own code - this should be updated to have
        # explicit error handling for each new db engine

        # <update>add explicit error handling for each db engine</update> 
        active_session.rollback()
        # Query for conflic class, use update method to change values based on dict
        c_tbl_primary_keys = [i.name for i in table.__table__.primary_key] # List of primary key col names
        c_tbl_cols = dict(sqla_tbl.__table__.columns) # String:Col Object crosswalk

        c_query_dict = {k:record_dict[k] for k in c_tbl_primary_keys if k in record_dict} # sub-dict from data of primary key:values
        c_oo_query_dict = {c_tbl_cols[k]:v for (k,v) in c_query_dict.items()} # col-object:query value for primary key cols

        c_target_record = session.query(sqla_tbl).filter(*[k==v for (k,v) in oo_query_dict.items()]).first()

        # apply new data values to the existing record
        for k, v in record_dict.items()
            setattr(c_target_record, k, v)

1voto

ThePsyjo Punkte 39

Das funktioniert bei mir mit sqlite3 und postgres. Allerdings könnte es mit kombinierten Primärschlüssel-Beschränkungen fehlschlagen und wird höchstwahrscheinlich mit zusätzlichen eindeutigen Beschränkungen fehlschlagen.

    try:
        t = self._meta.tables[data['table']]
    except KeyError:
        self._log.error('table "%s" unknown', data['table'])
        return

    try:
        q = insert(t, values=data['values'])
        self._log.debug(q)
        self._db.execute(q)
    except IntegrityError:
        self._log.warning('integrity error')
        where_clause = [c.__eq__(data['values'][c.name]) for c in t.c if c.primary_key]
        update_dict = {c.name: data['values'][c.name] for c in t.c if not c.primary_key}
        q = update(t, values=update_dict).where(*where_clause)
        self._log.debug(q)
        self._db.execute(q)
    except Exception as e:
        self._log.error('%s: %s', t.name, e)

1voto

E.ws Punkte 335

Da wir Probleme mit generierten Default-ids und Referenzen hatten, die zu ForeignKeyViolation-Fehlern führten wie

update or delete on table "..." violates foreign key constraint
Key (id)=(...) is still referenced from table "...".

mussten wir die id für das Aktualisierungsdiktat ausschließen, da sie sonst immer als neuer Standardwert erzeugt wird.

Darüber hinaus gibt die Methode die erstellte/aktualisierte Entität zurück.

from sqlalchemy.dialects.postgresql import insert # Important to use the postgresql insert

def upsert(session, data, key_columns, model):

    stmt = insert(model).values(data)

    # Important to exclude the ID for update!
    exclude_for_update = [model.id.name, *key_columns]
    update_dict = {c.name: c for c in stmt.excluded if c.name not in exclude_for_update}

    stmt = stmt.on_conflict_do_update(
        index_elements=key_columns,
        set_=update_dict
    ).returning(model)

    orm_stmt = (
        select(model)
        .from_statement(stmt)
        .execution_options(populate_existing=True)
    )

    return session.execute(orm_stmt).scalar()

Exemple :

class UpsertUser(Base):
    __tablename__ = 'upsert_user'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    name: str = Column(sa.String, nullable=False)
    user_sid: str = Column(sa.String, nullable=False, unique=True)
    house_admin = relationship('UpsertHouse', back_populates='admin', uselist=False)

class UpsertHouse(Base):
    __tablename__ = 'upsert_house'
    id = Column(Id, primary_key=True, default=uuid.uuid4)
    admin_id: Id = Column(Id, ForeignKey('upsert_user.id'), nullable=False)
    admin: UpsertUser = relationship('UpsertUser', back_populates='house_admin', uselist=False)

# Usage

upserted_user = upsert(session, updated_user, [UpsertUser.user_sid.name], UpsertUser)

Hinweis: Nur auf Postgresql getestet, könnte aber auch für andere DBs funktionieren, die ON DUPLICATE KEY UPDATE unterstützen, z.B. MySQL

0voto

benhengx Punkte 123

Im Falle von sqlite wird die sqlite_on_conflict='REPLACE' Option kann bei der Definition einer UniqueConstraint y sqlite_on_conflict_unique für die eindeutige Beschränkung auf eine einzelne Spalte. Dann session.add funktioniert in etwa so wie upsert . Siehe die offizielle Dokumentation .

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X