32 Stimmen

Überschreiben der Datei im Zip-Archiv

Ich habe archive.zip mit zwei Dateien: hello.txt und world.txt

Ich möchte die Datei hello.txt mit der folgenden Codezeile überschreiben:

import zipfile

z = zipfile.ZipFile('archive.zip','a')
z.write('hello.txt')
z.close()  

aber es überschreibt die Datei nicht, es erstellt irgendwie eine weitere Instanz von hello.txt - siehe sich das Winzip-Screenshot an:

Alternativer Text

Da es keine Funktion wie zipfile.remove() gibt, wie lässt sich dieses Problem am besten lösen?

44voto

nosklo Punkte 204121

Es gibt keine Möglichkeit, das mit dem Python-Zipfile-Modul zu tun. Sie müssen eine neue Zip-Datei erstellen und alles erneut vom ersten File sowie dem neuen modifizierten File neu komprimieren.

Im Folgenden finden Sie etwas Code, um genau das zu tun. Beachten Sie jedoch, dass es nicht effizient ist, da es alle Daten dekomprimiert und dann wieder komprimiert.

import tempfile
import zipfile
import shutil
import os

def remove_from_zip(zipfname, *filenames):
    tempdir = tempfile.mkdtemp()
    try:
        tempname = os.path.join(tempdir, 'new.zip')
        with zipfile.ZipFile(zipfname, 'r') as zipread:
            with zipfile.ZipFile(tempname, 'w') as zipwrite:
                for item in zipread.infolist():
                    if item.filename not in filenames:
                        data = zipread.read(item.filename)
                        zipwrite.writestr(item, data)
        shutil.move(tempname, zipfname)
    finally:
        shutil.rmtree(tempdir)

Verwendung:

remove_from_zip('archiv.zip', 'hallo.txt')
with zipfile.ZipFile('archiv.zip', 'a') as z:
    z.write('hallo.txt')

29voto

Or Weis Punkte 494

Building on nosklo's answer. UpdateableZipFile Eine Klasse, die von ZipFile erbt, das gleiche Interface beibehält, aber die Möglichkeit bietet, Dateien zu überschreiben (über writestr oder write) und Dateien zu entfernen.

import os
import shutil
import tempfile
from zipfile import ZipFile, ZIP_STORED, ZipInfo

class UpdateableZipFile(ZipFile):
    """
    Füge Löschen (über remove_file) und Aktualisieren (über die Methoden writestr und write) hinzu
    Um die Aktualisierungsfunktionen zu aktivieren, verwenden Sie UpdateableZipFile mit dem 'with statement',
    Bei __exit__ (wenn Aktualisierungen angewendet wurden) wird eine neue Zip-Datei die vorhandene mit den Aktualisierungen überschreiben
    """

    class DeleteMarker(object):
        pass

    def __init__(self, file, mode="r", compression=ZIP_STORED, allowZip64=False):
        # Basis initialisieren
        super(UpdateableZipFile, self).__init__(file, mode=mode,
                                                compression=compression,
                                                allowZip64=allowZip64)
        # Datei zum Überschreiben in zip verfolgen
        self._replace = {}
        # Ob das with statement aufgerufen wurde
        self._allow_updates = False

    def writestr(self, zinfo_or_arcname, bytes, compress_type=None):
        if isinstance(zinfo_or_arcname, ZipInfo):
            name = zinfo_or_arcname.filename
        else:
            name = zinfo_or_arcname
        # Wenn die Datei vorhanden ist und überschrieben werden muss,
        # markiere den Eintrag und erstelle eine temp-Datei dafür
        # wir erlauben dies nur, wenn das with statement verwendet wird
        if self._allow_updates and name in self.namelist():
            temp_file = self._replace[name] = self._replace.get(name,
                                                                tempfile.TemporaryFile())
            temp_file.write(bytes)
        # Ansonsten einfach normal verhalten
        else:
            super(UpdateableZipFile, self).writestr(zinfo_or_arcname,
                                                    bytes, compress_type=compress_type)

    def write(self, filename, arcname=None, compress_type=None):
        arcname = arcname or filename
        # Wenn die Datei vorhanden ist und überschrieben werden muss,
        # markiere den Eintrag und erstelle eine temp-Datei dafür
        # wir erlauben dies nur, wenn das with statement verwendet wird
        if self._allow_updates and arcname in self.namelist():
            temp_file = self._replace[arcname] = self._replace.get(arcname,
                                                                   tempfile.TemporaryFile())
            with open(filename, "rb") as source:
                shutil.copyfileobj(source, temp_file)
        # Ansonsten einfach normal verhalten
        else:
            super(UpdateableZipFile, self).write(filename, 
                                                 arcname=arcname, compress_type=compress_type)

    def __enter__(self):
        # Aktualisierungen erlauben
        self._allow_updates = True
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # rufe die Basis auf, um die Zip-Datei organisch zu schließen
        try:
            super(UpdateableZipFile, self).__exit__(exc_type, exc_val, exc_tb)
            if len(self._replace) > 0:
                self._rebuild_zip()
        finally:
            # Falls der Wiederaufbau der Zip fehlschlägt,
            # Stellen Sie sicher, dass alle temp-Dateien freigegeben werden
            self._close_all_temp_files()
            self._allow_updates = False

    def _close_all_temp_files(self):
        for temp_file in self._replace.itervalues():
            if hasattr(temp_file, 'close'):
                temp_file.close()

    def remove_file(self, path):
        self._replace[path] = self.DeleteMarker()

    def _rebuild_zip(self):
        tempdir = tempfile.mkdtemp()
        try:
            temp_zip_path = os.path.join(tempdir, 'new.zip')
            with ZipFile(self.filename, 'r') as zip_read:
                # Erstelle eine neue Zip-Datei mit zugewiesenen Eigenschaften
                with ZipFile(temp_zip_path, 'w', compression=self.compression,
                             allowZip64=self._allowZip64) as zip_write:
                    for item in zip_read.infolist():
                        # Überprüfen, ob die Datei ersetzt werden soll oder gelöscht werden soll
                        replacement = self._replace.get(item.filename, None)
                        # Wenn Markierung für Löschung, Datei nicht in die neue Zip-Datei kopieren
                        if isinstance(replacement, self.DeleteMarker):
                            del self._replace[item.filename]
                            continue
                        # Wenn Markierung für den Ersatz, temp-Datei statt der alten Datei kopieren
                        elif replacement is not None:
                            del self._replace[item.filename]
                            # Ersetzung in Archiv schreiben,
                            # und dann schließen (temp-Datei löschen)
                            replacement.seek(0)
                            data = replacement.read()
                            replacement.close()
                        else:
                            data = zip_read.read(item.filename)
                        zip_write.writestr(item, data)
            # Archiv mit der aktualisierten überschreiben
            shutil.move(temp_zip_path, self.filename)
        finally:
            shutil.rmtree(tempdir)

Beispiel für die Verwendung:

with UpdateableZipFile("C:\Temp\Test2.docx", "a") as o:
    # Eine Datei mit einem String überschreiben
    o.writestr("word/document.xml", "Einige Daten")
    # Eine vorhandene Datei vom Zip ausschließen
    o.remove_file("word/fontTable.xml")
    # Eine neue Datei (ohne Konflikt) in das Zip schreiben
    o.writestr("neue_datei", "weitere Daten")
    # Eine Datei mit einer Datei überschreiben
    o.write(r"C:\Temp\example.png", "word/settings.xml")

0voto

Patrick Shechet Punkte 21

Meine Lösung ähnelt den anderen Antworten, verwendet jedoch SQLite zur Verwaltung der Zwischendateien und bietet __getitem__, __setitem__ und __delitem__ für eine einfache Schnittstelle. Standardmäßig ist die Datenbank im Arbeitsspeicher, aber Sie können einen temporären Dateipfad angeben, wenn Sie einen Zip-Dateiname haben, der größer als der verfügbare Speicher ist. Und natürlich ist SQLite in Python integriert und schneller als das Dateisystem

import sqlite3
import subprocess
import zipfile
from pathlib import Path

from sql import CREATE_TABLE, DELETE_FILE, INSERT_FILE, SELECT_CONTENT

class EditableZip:
    """Gedacht, um das Bearbeiten von Dateien innerhalb eines Zip-Archivs einfach zu machen, ist diese Klasse in der Lage, Dateien aus einer Zip-Datei in eine SQLite-Datenbank zu laden, erleichtert das Bearbeiten/Löschen/Hinzufügen von Dateien und das Speichern in einer Zip-Datei.
    Die Datenbank kann im Arbeitsspeicher (Standard) oder in einer temporären Datei auf der Festplatte sein, wenn
    temp_db_path angegeben wird.

    Wenn eine Datei auf der Festplatte verwendet wird, kann EditableZip.close aufgerufen werden, um die Datei zu entfernen, oder EditableZip
    kann als Kontext-Manager verwendet werden.

    Wenn auto_save auf True gesetzt ist und ein ursprünglicher zip_path bereitgestellt wurde, wird die Datei
    überschrieben, wenn EditableZip geschlossen wird. Wenn Sie die Datei in eine andere Datei speichern möchten,
    oder kein zip_path bei der Instanziierung verwendet wird, kann auto_save einen Dateipfad enthalten.

    Dateien können durch Elementzuweisung hinzugefügt werden
    mit EditableZip(auto_save="beispiel.zip") as ez:
        ez["ding.txt"] = "kram"
        # leeres Verzeichnis
        ez["leer/"] = None

    Die Zuweisung akzeptiert Nicht-Text-Dateien als Bytes.

    EditableZip ist durch Indexierung zugreifbar. Wenn der Index ein Pfad in der Datenbank ist, werden die Daten zurückgegeben.

    Auf EditableZip.files kann zugegriffen werden, um über Dateien in der Datenbank zu iterieren.
    """

    def __init__(
        self,
        zip_path: None | str | Path = None,
        temp_db_path: None | Path = None,
        auto_save: bool | str | Path = False,
    ):
        self.temp_db_path, self.auto_save, self.file_path = (
            temp_db_path,
            auto_save,
            zip_path,
        )
        self.db = sqlite3.connect(
            str(temp_db_path if temp_db_path is not None else ":memory:")
        )
        self.db.execute(CREATE_TABLE)

        if self.file_path:
            self.load(self.file_path)

    @property
    def files(self):
        "Gibt einen Generator aller Dateipfade in der Datenbank zurück."
        try:
            return (
                i[0] for i in self.db.execute("SELECT file_path FROM files").fetchall()
            )
        except TypeError:
            return None

    def load(self, zip_path: str | Path) -> None:
        "Fügt alle Dateien aus der Zip-Datei bei zip_path zur Datenbank hinzu."
        with zipfile.ZipFile(zip_path, mode="r") as archive:
            for item in archive.infolist():
                self[item.filename] = (
                    None if item.filename[-1] == "/" else archive.read(item)
                )

    def save(self, zip_path: None | str | Path) -> Path:
        "Speichert alle Dateien aus der Datenbank in der Zip-Datei bei zip_path."
        zip_path = self.file_path if zip_path is None else zip_path
        with zipfile.ZipFile(zip_path, "w") as archive:
            for file in self.files:
                if file_data := self.fetch(file):
                    archive.writestr(file, file_data)
                else:
                    archive.writestr(zipfile.ZipInfo(file), "")
        return zip_path

    def close(self):
        "Automatisches Speichern bei Bedarf und Schließen + Entfernen der Datenbank."
        if self.auto_save:
            self.save(
                zip_path=self.auto_save
                if isinstance(self.auto_save, (str, Path))
                else None
            )
        self.db.close()
        if isinstance(self.temp_db_path, Path):
            self.temp_db_path.unlink(missing_ok=True)

    def fetch(self, file_path: str) -> bytes:
        "Holen Sie sich den Inhalt der Datei für file_path aus der Datenbank."
        try:
            return self.db.execute(SELECT_CONTENT, {"file_path": file_path}).fetchone()[
                0
            ]
        except TypeError:
            return None

    def __getitem__(self, key):
        result = self.fetch(key)
        try:
            return result.decode("utf-8")
        except AttributeError:
            return result

    def __setitem__(self, file_path, content: str | bytes):
        if isinstance(content, str):
            content = content.encode("utf-8")
        self.db.execute(
            INSERT_FILE,
            {"file_path": file_path, "file_content": content},
        )

    def __delitem__(self, file_path):
        self.db.execute(DELETE_FILE, {"file_path": file_path})

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

if __name__ == "__main__":
    # Ein Anwendungsfall: Bearbeiten von epub-Dateien.
    # Dateiquelle:
    # https://archiveofourown.org/downloads/13795605/Victoria%20Potter%20and%20the.epub?updated_at=1650231615
    file_path = Path("Victoria Potter and the.epub")
    new_file = (file_path.parent / (file_path.stem + "- lowercase")).with_suffix(
        file_path.suffix
    )

    # Erstellen Sie eine Kopie des epubs mit allen Buchstaben in Kleinbuchstaben
    with EditableZip(zip_path=file_path, auto_save=new_file) as ez:
        for file in ez.files:
            if Path(file).suffix in [".html", ".xhtml"]:
                ez[file] = ez[file].lower()

-1voto

Kyrylo Kravets Punkte 135

Referenz: Datei aus Zip-Datei mit dem ZipFile-Modul löschen

Kurz gesagt,

Sie können den Code von https://github.com/python/cpython/blob/659eb048cc9cac73c46349eb29845bc5cd630f09/Lib/zipfile.py übernehmen und eine separate Datei daraus erstellen. Danach verweisen Sie einfach darauf von Ihrem Projekt anstelle der integrierten Python-Bibliothek: import myproject.zipfile as zipfile.

Verwendung:

with zipfile.ZipFile(f"archiv.zip", "a") as z:
    z.remove(f"erstefile.txt")

CodeJaeger.com

CodeJaeger ist eine Gemeinschaft für Programmierer, die täglich Hilfe erhalten..
Wir haben viele Inhalte, und Sie können auch Ihre eigenen Fragen stellen oder die Fragen anderer Leute lösen.

Powered by:

X