Seit diese Frage geschlossen wurde, poste ich hier, wie man es mit SQLAlchemy macht. Über Rekursion versucht es, einen Bulk-Insert oder -Update gegen Rennbedingungen und Validierungsfehler zu bekämpfen.
Zuerst die Imports
import itertools as it
from functools import partial
from operator import itemgetter
from sqlalchemy.exc import IntegrityError
from app import session
from models import Posts
Jetzt ein paar Hilfsfunktionen
def chunk(content, chunksize=None):
"""Gruppiert Daten in Chunks, jeder mit (maximal) `chunksize` Elementen.
https://stackoverflow.com/a/22919323/408556
"""
if chunksize:
i = iter(content)
generator = (list(it.islice(i, chunksize)) for _ in it.count())
else:
generator = iter([content])
return it.takewhile(bool, generator)
def gen_resources(records):
"""Gibt ein Dictionary aus, wenn die id des Datensatzes bereits existiert, andernfalls ein Zeilenobjekt.
"""
ids = {item[0] for item in session.query(Posts.id)}
for record in records:
is_row = hasattr(record, 'to_dict')
if is_row and record.id in ids:
# Es ist eine Zeile, aber die id existiert bereits, also müssen wir sie
# in ein Dictionary konvertieren, das den bestehenden Datensatz aktualisiert. Da es ein Duplikat ist,
# auch True ausgeben
yield record.to_dict(), True
elif is_row:
# Es ist eine Zeile und die id existiert nicht, also ist keine Konvertierung erforderlich.
# Da es kein Duplikat ist, auch False ausgeben
yield record, False
elif record['id'] in ids:
# Es ist ein Dictionary und die id existiert bereits, also ist keine Konvertierung erforderlich.
# Da es ein Duplikat ist, auch True ausgeben
yield record, True
else:
# Es ist ein Dictionary und die id existiert nicht, also müssen wir es konvertieren.
# Da es kein Duplikat ist, auch False ausgeben
yield Posts(**record), False
Und schließlich die upsert Funktion
def upsert(data, chunksize=None):
for records in chunk(data, chunksize):
resources = gen_resources(records)
sorted_resources = sorted(resources, key=itemgetter(1))
for dupe, group in it.groupby(sorted_resources, itemgetter(1)):
items = [g[0] for g in group]
if dupe:
_upsert = partial(session.bulk_update_mappings, Posts)
else:
_upsert = session.add_all
try:
_upsert(items)
session.commit()
except IntegrityError:
# Ein Datensatz wurde nach unserer Prüfung hinzugefügt oder gelöscht, also erneut versuchen
#
# entsprechend anpassen, indem zusätzliche Ausnahmen hinzugefügt werden, z.B.,
# except (IntegrityError, ValidationError, ValueError)
db.session.rollback()
upsert(items)
except Exception as e:
# Ein anderer Fehler ist aufgetreten, also die Chunkgröße reduzieren, um die
# fehlerhafte(n) Zeile(n) zu isolieren
db.session.rollback()
num_items = len(items)
if num_items > 1:
upsert(items, num_items // 2)
else:
print('Fehler beim Hinzufügen des Datensatzes {}'.format(items[0]))
So wird es benutzt
>>> data = [
... {'id': 1, 'text': 'aktuallisierter post1'},
... {'id': 5, 'text': 'aktuallisierter post5'},
... {'id': 1000, 'text': 'neuer post1000'}]
...
>>> upsert(data)
Der Vorteil gegenüber bulk_save_objects
ist, dass es Beziehungen, Fehlerüberprüfung usw. beim Einfügen behandeln kann (im Gegensatz zu Bulk-Operationen).