hachoir_parser/archive/sevenzip.py

"""
7zip file parser

Informations:
- File 7zformat.txt of 7-zip SDK:
  http://www.7-zip.org/sdk.html

Author: Olivier SCHWAB
Creation date: 6 december 2006
"""

from hachoir_parser import Parser
from hachoir_core.field import (Field, FieldSet, ParserError,
    GenericVector,
    Enum, UInt8, UInt32, UInt64,
    Bytes, RawBytes)
from hachoir_core.endian import LITTLE_ENDIAN
from hachoir_core.text_handler import textHandler, hexadecimal, filesizeHandler

class SZUInt64(Field):
    """
    Variable length UInt64, where the first byte gives both the number of bytes
    needed and the upper byte value.
    """
    def __init__(self, parent, name, max_size=None, description=None):
        Field.__init__(self, parent, name, size=8, description=description)
        value = 0
        addr = self.absolute_address
        mask = 0x80
        firstByte = parent.stream.readBits(addr, 8, LITTLE_ENDIAN)
        for i in xrange(8):
            addr += 8
            if not (firstByte & mask):
                value += ((firstByte & (mask-1)) << (8*i))
                break
            value |= (parent.stream.readBits(addr, 8, LITTLE_ENDIAN) << (8*i))
            mask >>= 1
            self._size += 8
        self.createValue = lambda: value

ID_END, ID_HEADER, ID_ARCHIVE_PROPS, ID_ADD_STREAM_INFO, ID_MAIN_STREAM_INFO, \
ID_FILES_INFO, ID_PACK_INFO, ID_UNPACK_INFO, ID_SUBSTREAMS_INFO, ID_SIZE, \
ID_CRC, ID_FOLDER, ID_CODERS_UNPACK_SIZE, ID_NUM_UNPACK_STREAMS, \
ID_EMPTY_STREAM, ID_EMPTY_FILE, ID_ANTI, ID_NAME, ID_CREATION_TIME, \
ID_LAST_ACCESS_TIME, ID_LAST_WRITE_TIME, ID_WIN_ATTR, ID_COMMENT, \
ID_ENCODED_HEADER = xrange(24)

ID_INFO = {
    ID_END               : "End",
    ID_HEADER            : "Header embedding another one",
    ID_ARCHIVE_PROPS     : "Archive Properties",
    ID_ADD_STREAM_INFO   : "Additional Streams Info",
    ID_MAIN_STREAM_INFO  : "Main Streams Info",
    ID_FILES_INFO        : "Files Info",
    ID_PACK_INFO         : "Pack Info",
    ID_UNPACK_INFO       : "Unpack Info",
    ID_SUBSTREAMS_INFO   : "Substreams Info",
    ID_SIZE              : "Size",
    ID_CRC               : "CRC",
    ID_FOLDER            : "Folder",
    ID_CODERS_UNPACK_SIZE: "Coders Unpacked size",
    ID_NUM_UNPACK_STREAMS: "Number of Unpacked Streams",
    ID_EMPTY_STREAM      : "Empty Stream",
    ID_EMPTY_FILE        : "Empty File",
    ID_ANTI              : "Anti",
    ID_NAME              : "Name",
    ID_CREATION_TIME     : "Creation Time",
    ID_LAST_ACCESS_TIME  : "Last Access Time",
    ID_LAST_WRITE_TIME   : "Last Write Time",
    ID_WIN_ATTR          : "Win Attributes",
    ID_COMMENT           : "Comment",
    ID_ENCODED_HEADER    : "Header holding encoded data info",
}

class SkippedData(FieldSet):
    def createFields(self):
        yield Enum(UInt8(self, "id[]"), ID_INFO)
        size = SZUInt64(self, "size")
        yield size
        if size.value > 0:
            yield RawBytes(self, "data", size.value)

def waitForID(s, wait_id, wait_name="waited_id[]"):
    while not s.eof:
        addr = s.absolute_address+s.current_size
        uid = s.stream.readBits(addr, 8, LITTLE_ENDIAN)
        if uid == wait_id:
            yield Enum(UInt8(s, wait_name), ID_INFO)
            s.info("Found ID %s (%u)" % (ID_INFO[uid], uid))
            return
        s.info("Skipping ID %u!=%u" % (uid, wait_id))
        yield SkippedData(s, "skipped_id[]", "%u != %u" % (uid, wait_id))

class HashDigest(FieldSet):
    def __init__(self, parent, name, num_digests, desc=None):
        FieldSet.__init__(self, parent, name, desc)
        self.num_digests = num_digests
    def createFields(self):
        yield Enum(UInt8(self, "id"), ID_INFO)
        bytes = self.stream.readBytes(self.absolute_address, self.num_digests)
        if self.num_digests > 0:
            yield GenericVector(self, "defined[]", self.num_digests, UInt8, "bool")
            for index in xrange(self.num_digests):
                if bytes[index]:
                    yield textHandler(UInt32(self, "hash[]",
                        "Hash for digest %u" % index), hexadecimal)

class PackInfo(FieldSet):
    def createFields(self):
        yield Enum(UInt8(self, "id"), ID_INFO)
        # Very important, helps determine where the data is
        yield SZUInt64(self, "pack_pos", "Position of the packs")
        num = SZUInt64(self, "num_pack_streams")
        yield num
        num = num.value

        for field in waitForID(self, ID_SIZE, "size_marker"):
            yield field

        for size in xrange(num):
            yield SZUInt64(self, "pack_size[]")

        while not self.eof:
            addr = self.absolute_address+self.current_size
            uid = self.stream.readBits(addr, 8, LITTLE_ENDIAN)
            if uid == ID_END:
                yield Enum(UInt8(self, "end_marker"), ID_INFO)
                break
            elif uid == ID_CRC:
                yield HashDigest(self, "hash_digest", size)
            else:
                yield SkippedData(self, "skipped_data")

def lzmaParams(value):
    param = value.value
    remainder = param / 9
    # Literal coder context bits
    lc = param % 9
    # Position state bits
    pb = remainder / 5
    # Literal coder position bits
    lp = remainder % 5
    return "lc=%u pb=%u lp=%u" % (lc, lp, pb)

class CoderID(FieldSet):
    CODECS = {
        # Only 2 methods ... and what about PPMD ?
        "\0"    : "copy",
        "\3\1\1": "lzma",
    }
    def createFields(self):
        byte = UInt8(self, "id_size")
        yield byte
        byte = byte.value
        self.info("ID=%u" % byte)
        size = byte & 0xF
        if size > 0:
            name = self.stream.readBytes(self.absolute_address+self.current_size, size)
            if name in self.CODECS:
                name = self.CODECS[name]
                self.info("Codec is %s" % name)
            else:
                self.info("Undetermined codec %s" % name)
                name = "unknown"
            yield RawBytes(self, name, size)
            #yield textHandler(Bytes(self, "id", size), lambda: name)
        if byte & 0x10:
            yield SZUInt64(self, "num_stream_in")
            yield SZUInt64(self, "num_stream_out")
            self.info("Streams: IN=%u    OUT=%u" % \
                      (self["num_stream_in"].value, self["num_stream_out"].value))
        if byte & 0x20:
            size = SZUInt64(self, "properties_size[]")
            yield size
            if size.value == 5:
                #LzmaDecodeProperties@LZMAStateDecode.c
                yield textHandler(UInt8(self, "parameters"), lzmaParams)
                yield filesizeHandler(UInt32(self, "dictionary_size"))
            elif size.value > 0:
                yield RawBytes(self, "properties[]", size.value)

class CoderInfo(FieldSet):
    def __init__(self, parent, name, desc=None):
        FieldSet.__init__(self, parent, name, desc)
        self.in_streams = 1
        self.out_streams = 1
    def createFields(self):
        # The real ID
        addr = self.absolute_address + self.current_size
        b = self.parent.stream.readBits(addr, 8, LITTLE_ENDIAN)
        cid = CoderID(self, "coder_id")
        yield cid
        if b&0x10: # Work repeated, ...
            self.in_streams = cid["num_stream_in"].value
            self.out_streams = cid["num_stream_out"].value

        # Skip other IDs
        while b&0x80:
            addr = self.absolute_address + self.current_size
            b = self.parent.stream.readBits(addr, 8, LITTLE_ENDIAN)
            yield CoderID(self, "unused_codec_id[]")

class BindPairInfo(FieldSet):
    def createFields(self):
        # 64 bits values then cast to 32 in fact
        yield SZUInt64(self, "in_index")
        yield SZUInt64(self, "out_index")
        self.info("Indexes: IN=%u   OUT=%u" % \
                  (self["in_index"].value, self["out_index"].value))

class FolderItem(FieldSet):
    def __init__(self, parent, name, desc=None):
        FieldSet.__init__(self, parent, name, desc)
        self.in_streams = 0
        self.out_streams = 0

    def createFields(self):
        yield SZUInt64(self, "num_coders")
        num = self["num_coders"].value
        self.info("Folder: %u codecs" % num)

        # Coders info
        for index in xrange(num):
            ci = CoderInfo(self, "coder_info[]")
            yield ci
            self.in_streams += ci.in_streams
            self.out_streams += ci.out_streams

        # Bin pairs
        self.info("out streams: %u" % self.out_streams)
        for index in xrange(self.out_streams-1):
            yield BindPairInfo(self, "bind_pair[]")

        # Packed streams
        # @todo: Actually find mapping
        packed_streams = self.in_streams - self.out_streams + 1
        if packed_streams == 1:
            pass
        else:
            for index in xrange(packed_streams):
                yield SZUInt64(self, "pack_stream[]")


class UnpackInfo(FieldSet):
    def createFields(self):
        yield Enum(UInt8(self, "id"), ID_INFO)
        # Wait for synch
        for field in waitForID(self, ID_FOLDER, "folder_marker"):
            yield field
        yield SZUInt64(self, "num_folders")

        # Get generic info
        num = self["num_folders"].value
        self.info("%u folders" % num)
        yield UInt8(self, "is_external")

        # Read folder items
        for folder_index in xrange(num):
            yield FolderItem(self, "folder_item[]")

        # Get unpack sizes for each coder of each folder
        for field in waitForID(self, ID_CODERS_UNPACK_SIZE, "coders_unpsize_marker"):
            yield field
        for folder_index in xrange(num):
            folder_item = self["folder_item[%u]" % folder_index]
            for index in xrange(folder_item.out_streams):
                #yield UInt8(self, "unpack_size[]")
                yield SZUInt64(self, "unpack_size[]")

        # Extract digests
        while not self.eof:
            addr = self.absolute_address+self.current_size
            uid = self.stream.readBits(addr, 8, LITTLE_ENDIAN)
            if uid == ID_END:
                yield Enum(UInt8(self, "end_marker"), ID_INFO)
                break
            elif uid == ID_CRC:
                yield HashDigest(self, "hash_digest", num)
            else:
                yield SkippedData(self, "skip_data")

class SubStreamInfo(FieldSet):
    def createFields(self):
        yield Enum(UInt8(self, "id"), ID_INFO)
        raise ParserError("SubStreamInfo not implemented yet")

class EncodedHeader(FieldSet):
    def createFields(self):
        yield Enum(UInt8(self, "id"), ID_INFO)
        while not self.eof:
            addr = self.absolute_address+self.current_size
            uid = self.stream.readBits(addr, 8, LITTLE_ENDIAN)
            if uid == ID_END:
                yield Enum(UInt8(self, "end_marker"), ID_INFO)
                break
            elif uid == ID_PACK_INFO:
                yield PackInfo(self, "pack_info", ID_INFO[ID_PACK_INFO])
            elif uid == ID_UNPACK_INFO:
                yield UnpackInfo(self, "unpack_info", ID_INFO[ID_UNPACK_INFO])
            elif uid == ID_SUBSTREAMS_INFO:
                yield SubStreamInfo(self, "substreams_info", ID_INFO[ID_SUBSTREAMS_INFO])
            else:
                self.info("Unexpected ID (%i)" % uid)
                break

class IDHeader(FieldSet):
    def createFields(self):
        yield Enum(UInt8(self, "id"), ID_INFO)
        ParserError("IDHeader not implemented")

class NextHeader(FieldSet):
    def __init__(self, parent, name, desc="Next header"):
        FieldSet.__init__(self, parent, name, desc)
        self._size = 8*self["/signature/start_hdr/next_hdr_size"].value
    # Less work, as much interpretable information as the other
    # version... what an obnoxious format
    def createFields2(self):
        yield Enum(UInt8(self, "header_type"), ID_INFO)
        yield RawBytes(self, "header_data", self._size-1)
    def createFields(self):
        uid = self.stream.readBits(self.absolute_address, 8, LITTLE_ENDIAN)
        if uid == ID_HEADER:
            yield IDHeader(self, "header", ID_INFO[ID_HEADER])
        elif uid == ID_ENCODED_HEADER:
            yield EncodedHeader(self, "encoded_hdr", ID_INFO[ID_ENCODED_HEADER])
            # Game Over: this is usually encoded using LZMA, not copy
            # See SzReadAndDecodePackedStreams/SzDecode being called with the
            # data position from "/next_hdr/encoded_hdr/pack_info/pack_pos"
            # We should process further, yet we can't...
        else:
            ParserError("Unexpected ID %u" % uid)
        size = self._size - self.current_size
        if size > 0:
            yield RawBytes(self, "next_hdr_data", size//8, "Next header's data")

class Body(FieldSet):
    def __init__(self, parent, name, desc="Body data"):
        FieldSet.__init__(self, parent, name, desc)
        self._size = 8*self["/signature/start_hdr/next_hdr_offset"].value
    def createFields(self):
        if "encoded_hdr" in self["/next_hdr/"]:
            pack_size = sum([s.value for s in self.array("/next_hdr/encoded_hdr/pack_info/pack_size")])
            body_size = self["/next_hdr/encoded_hdr/pack_info/pack_pos"].value
            yield RawBytes(self, "compressed_data", body_size, "Compressed data")
            # Here we could check if copy method was used to "compress" it,
            # but this never happens, so just output "compressed file info"
            yield RawBytes(self, "compressed_file_info", pack_size,
                           "Compressed file information")
            size = (self._size//8) - pack_size - body_size
            if size > 0:
                yield RawBytes(self, "unknown_data", size)
        elif "header" in self["/next_hdr"]:
            yield RawBytes(self, "compressed_data", self._size//8, "Compressed data")

class StartHeader(FieldSet):
    static_size = 160
    def createFields(self):
        yield textHandler(UInt64(self, "next_hdr_offset",
            "Next header offset"), hexadecimal)
        yield UInt64(self, "next_hdr_size", "Next header size")
        yield textHandler(UInt32(self, "next_hdr_crc",
            "Next header CRC"), hexadecimal)

class SignatureHeader(FieldSet):
    static_size = 96 + StartHeader.static_size
    def createFields(self):
        yield Bytes(self, "signature", 6, "Signature Header")
        yield UInt8(self, "major_ver", "Archive major version")
        yield UInt8(self, "minor_ver", "Archive minor version")
        yield textHandler(UInt32(self, "start_hdr_crc",
            "Start header CRC"), hexadecimal)
        yield StartHeader(self, "start_hdr", "Start header")

class SevenZipParser(Parser):
    PARSER_TAGS = {
        "id": "7zip",
        "category": "archive",
        "file_ext": ("7z",),
        "mime": (u"application/x-7z-compressed",),
        "min_size": 32*8,
        "magic": (("7z\xbc\xaf\x27\x1c", 0),),
        "description": "Compressed archive in 7z format"
    }
    endian = LITTLE_ENDIAN

    def createFields(self):
        yield SignatureHeader(self, "signature", "Signature Header")
        yield Body(self, "body_data")
        yield NextHeader(self, "next_hdr")

    def validate(self):
        if self.stream.readBytes(0,6) != "7z\xbc\xaf'\x1c":
            return "Invalid signature"
        return True

    def createContentSize(self):
        size = self["/signature/start_hdr/next_hdr_offset"].value
        size += self["/signature/start_hdr/next_hdr_size"].value
        size += 12 # Signature size
        size += 20 # Start header size
        return size*8
hachoir-parser documentation built on Sept. 20, 2017, 5:30 p.m.