hachoir_parser/archive/rpm.py

"""
RPM archive parser.

Author: Victor Stinner, 1st December 2005.
"""

from hachoir_parser import Parser
from hachoir_core.field import (FieldSet, ParserError,
    UInt8, UInt16, UInt32, UInt64, Enum,
    NullBytes, Bytes, RawBytes, SubFile,
    Character, CString, String)
from hachoir_core.endian import BIG_ENDIAN
from hachoir_parser.archive.gzip_parser import GzipParser
from hachoir_parser.archive.bzip2_parser import Bzip2Parser

class ItemContent(FieldSet):
    format_type = {
        0: UInt8,
        1: Character,
        2: UInt8,
        3: UInt16,
        4: UInt32,
        5: UInt64,
        6: CString,
        7: RawBytes,
        8: CString,
        9: CString
    }

    def __init__(self, parent, name, item):
        FieldSet.__init__(self, parent, name, item.description)
        self.related_item = item
        self._name = "content_%s" % item.name

    def createFields(self):
        item = self.related_item
        type = item["type"].value

        cls = self.format_type[type]
        count = item["count"].value
        if cls is RawBytes: # or type == 8:
            if cls is RawBytes:
                args = (self, "value", count)
            else:
                args = (self, "value") # cls is CString
            count = 1
        else:
            if 1 < count:
                args = (self, "value[]")
            else:
                args = (self, "value")
        for index in xrange(count):
            yield cls(*args)

class Item(FieldSet):
    type_name = {
        0: "NULL",
        1: "CHAR",
        2: "INT8",
        3: "INT16",
        4: "INT32",
        5: "INT64",
        6: "CSTRING",
        7: "BIN",
        8: "CSTRING_ARRAY",
        9: "CSTRING?"
    }
    tag_name = {
        1000: "File size",
        1001: "(Broken) MD5 signature",
        1002: "PGP 2.6.3 signature",
        1003: "(Broken) MD5 signature",
        1004: "MD5 signature",
        1005: "GnuPG signature",
        1006: "PGP5 signature",
        1007: "Uncompressed payload size (bytes)",
        256+8: "Broken SHA1 header digest",
        256+9: "Broken SHA1 header digest",
        256+13: "Broken SHA1 header digest",
        256+11: "DSA header signature",
        256+12: "RSA header signature"
    }

    def __init__(self, parent, name, description=None, tag_name_dict=None):
        FieldSet.__init__(self, parent, name, description)
        if tag_name_dict is None:
            tag_name_dict = Item.tag_name
        self.tag_name_dict = tag_name_dict

    def createFields(self):
        yield Enum(UInt32(self, "tag", "Tag"), self.tag_name_dict)
        yield Enum(UInt32(self, "type", "Type"), Item.type_name)
        yield UInt32(self, "offset", "Offset")
        yield UInt32(self, "count", "Count")

    def createDescription(self):
        return "Item: %s (%s)" % (self["tag"].display, self["type"].display)

class ItemHeader(Item):
    tag_name = {
        61: "Current image",
        62: "Signatures",
        63: "Immutable",
        64: "Regions",
        100: "I18N string locales",
        1000: "Name",
        1001: "Version",
        1002: "Release",
        1003: "Epoch",
        1004: "Summary",
        1005: "Description",
        1006: "Build time",
        1007: "Build host",
        1008: "Install time",
        1009: "Size",
        1010: "Distribution",
        1011: "Vendor",
        1012: "Gif",
        1013: "Xpm",
        1014: "Licence",
        1015: "Packager",
        1016: "Group",
        1017: "Changelog",
        1018: "Source",
        1019: "Patch",
        1020: "Url",
        1021: "OS",
        1022: "Arch",
        1023: "Prein",
        1024: "Postin",
        1025: "Preun",
        1026: "Postun",
        1027: "Old filenames",
        1028: "File sizes",
        1029: "File states",
        1030: "File modes",
        1031: "File uids",
        1032: "File gids",
        1033: "File rdevs",
        1034: "File mtimes",
        1035: "File MD5s",
        1036: "File link to's",
        1037: "File flags",
        1038: "Root",
        1039: "File username",
        1040: "File groupname",
        1043: "Icon",
        1044: "Source rpm",
        1045: "File verify flags",
        1046: "Archive size",
        1047: "Provide name",
        1048: "Require flags",
        1049: "Require name",
        1050: "Require version",
        1051: "No source",
        1052: "No patch",
        1053: "Conflict flags",
        1054: "Conflict name",
        1055: "Conflict version",
        1056: "Default prefix",
        1057: "Build root",
        1058: "Install prefix",
        1059: "Exclude arch",
        1060: "Exclude OS",
        1061: "Exclusive arch",
        1062: "Exclusive OS",
        1064: "RPM version",
        1065: "Trigger scripts",
        1066: "Trigger name",
        1067: "Trigger version",
        1068: "Trigger flags",
        1069: "Trigger index",
        1079: "Verify script",
        #TODO: Finish the list (id 1070..1162 using rpm library source code)
    }

    def __init__(self, parent, name, description=None):
        Item.__init__(self, parent, name, description, self.tag_name)

def sortRpmItem(a,b):
    return int( a["offset"].value - b["offset"].value )

class PropertySet(FieldSet):
    def __init__(self, parent, name, *args):
        FieldSet.__init__(self, parent, name, *args)
        self._size = self["content_item[1]"].address + self["size"].value * 8

    def createFields(self):
        # Read chunk header
        yield Bytes(self, "signature", 3, r"Property signature (\x8E\xAD\xE8)")
        if self["signature"].value != "\x8E\xAD\xE8":
            raise ParserError("Invalid property signature")
        yield UInt8(self, "version", "Signature version")
        yield NullBytes(self, "reserved", 4, "Reserved")
        yield UInt32(self, "count", "Count")
        yield UInt32(self, "size", "Size")

        # Read item header
        items = []
        for i in range(0, self["count"].value):
            item = ItemHeader(self, "item[]")
            yield item
            items.append(item)

        # Sort items by their offset
        items.sort( sortRpmItem )

        # Read item content
        start = self.current_size/8
        for item in items:
            offset = item["offset"].value
            diff = offset - (self.current_size/8 - start)
            if 0 < diff:
                yield NullBytes(self, "padding[]", diff)
            yield ItemContent(self, "content[]", item)
        size = start + self["size"].value - self.current_size/8
        if 0 < size:
            yield NullBytes(self, "padding[]", size)

class RpmFile(Parser):
    PARSER_TAGS = {
        "id": "rpm",
        "category": "archive",
        "file_ext": ("rpm",),
        "mime": (u"application/x-rpm",),
        "min_size": (96 + 16 + 16)*8, # file header + checksum + content header
        "magic": (('\xED\xAB\xEE\xDB', 0),),
        "description": "RPM package"
    }
    TYPE_NAME = {
        0: "Binary",
        1: "Source"
    }
    endian = BIG_ENDIAN

    def validate(self):
        if self["signature"].value != '\xED\xAB\xEE\xDB':
            return "Invalid signature"
        if self["major_ver"].value != 3:
            return "Unknown major version (%u)" % self["major_ver"].value
        if self["type"].value not in self.TYPE_NAME:
            return "Invalid RPM type"
        return True

    def createFields(self):
        yield Bytes(self, "signature", 4, r"RPM file signature (\xED\xAB\xEE\xDB)")
        yield UInt8(self, "major_ver", "Major version")
        yield UInt8(self, "minor_ver", "Minor version")
        yield Enum(UInt16(self, "type", "RPM type"), RpmFile.TYPE_NAME)
        yield UInt16(self, "architecture", "Architecture")
        yield String(self, "name", 66, "Archive name", strip="\0", charset="ASCII")
        yield UInt16(self, "os", "OS")
        yield UInt16(self, "signature_type", "Type of signature")
        yield NullBytes(self, "reserved", 16, "Reserved")
        yield PropertySet(self, "checksum", "Checksum (signature)")
        yield PropertySet(self, "header", "Header")

        if self._size is None: # TODO: is it possible to handle piped input?
            raise NotImplementedError

        size = (self._size - self.current_size) // 8
        if size:
            if 3 <= size and self.stream.readBytes(self.current_size, 3) == "BZh":
                yield SubFile(self, "content", size, "bzip2 content", parser=Bzip2Parser)
            else:
                yield SubFile(self, "content", size, "gzip content", parser=GzipParser)
hachoir-parser documentation built on Sept. 20, 2017, 5:30 p.m.