hachoir_parser/container/ogg.py

#
# Ogg parser
# Author Julien Muchembled <jm AT jm10.no-ip.com>
# Created: 10 june 2006
#

from hachoir_parser import Parser
from hachoir_core.field import (Field, FieldSet, createOrphanField,
    NullBits, Bit, Bits, Enum, Fragment, MissingField, ParserError,
    UInt8, UInt16, UInt24, UInt32, UInt64,
    RawBytes, String, PascalString32, NullBytes)
from hachoir_core.stream import FragmentedStream, InputStreamError
from hachoir_core.endian import LITTLE_ENDIAN, BIG_ENDIAN
from hachoir_core.tools import humanDurationNanosec
from hachoir_core.text_handler import textHandler, hexadecimal

MAX_FILESIZE = 1000 * 1024 * 1024

class XiphInt(Field):
    """
    Positive integer with variable size. Values bigger than 254 are stored as
    (255, 255, ..., rest): value is the sum of all bytes.

    Example: 1000 is stored as (255, 255, 255, 235), total = 255*3+235 = 1000
    """
    def __init__(self, parent, name, max_size=None, description=None):
        Field.__init__(self, parent, name, size=0, description=description)
        value = 0
        addr = self.absolute_address
        while max_size is None or self._size < max_size:
            byte = parent.stream.readBits(addr, 8, LITTLE_ENDIAN)
            value += byte
            self._size += 8
            if byte != 0xff:
                break
            addr += 8
        self.createValue = lambda: value

class Lacing(FieldSet):
    def createFields(self):
        size = self.size
        while size:
            field = XiphInt(self, 'size[]', size)
            yield field
            size -= field.size

def parseVorbisComment(parent):
    yield PascalString32(parent, 'vendor', charset="UTF-8")
    yield UInt32(parent, 'count')
    for index in xrange(parent["count"].value):
        yield PascalString32(parent, 'metadata[]', charset="UTF-8")
    if parent.current_size != parent.size:
        yield UInt8(parent, "framing_flag")

PIXEL_FORMATS = {
    0: "4:2:0",
    2: "4:2:2",
    3: "4:4:4",
}

def formatTimeUnit(field):
    return humanDurationNanosec(field.value * 100)

def parseVideoHeader(parent):
    yield NullBytes(parent, "padding[]", 2)
    yield String(parent, "fourcc", 4)
    yield UInt32(parent, "size")
    yield textHandler(UInt64(parent, "time_unit", "Frame duration"), formatTimeUnit)
    yield UInt64(parent, "sample_per_unit")
    yield UInt32(parent, "default_len")
    yield UInt32(parent, "buffer_size")
    yield UInt16(parent, "bits_per_sample")
    yield NullBytes(parent, "padding[]", 2)
    yield UInt32(parent, "width")
    yield UInt32(parent, "height")
    yield NullBytes(parent, "padding[]", 4)

def parseTheoraHeader(parent):
    yield UInt8(parent, "version_major")
    yield UInt8(parent, "version_minor")
    yield UInt8(parent, "version_revision")
    yield UInt16(parent, "width", "Width*16 in pixel")
    yield UInt16(parent, "height", "Height*16 in pixel")

    yield UInt24(parent, "frame_width")
    yield UInt24(parent, "frame_height")
    yield UInt8(parent, "offset_x")
    yield UInt8(parent, "offset_y")

    yield UInt32(parent, "fps_num", "Frame per second numerator")
    yield UInt32(parent, "fps_den", "Frame per second denominator")
    yield UInt24(parent, "aspect_ratio_num", "Aspect ratio numerator")
    yield UInt24(parent, "aspect_ratio_den", "Aspect ratio denominator")

    yield UInt8(parent, "color_space")
    yield UInt24(parent, "target_bitrate")
    yield Bits(parent, "quality", 6)
    yield Bits(parent, "gp_shift", 5)
    yield Enum(Bits(parent, "pixel_format", 2), PIXEL_FORMATS)
    yield Bits(parent, "spare_config", 3)

def parseVorbisHeader(parent):
    yield UInt32(parent, "vorbis_version")
    yield UInt8(parent, "audio_channels")
    yield UInt32(parent, "audio_sample_rate")
    yield UInt32(parent, "bitrate_maximum")
    yield UInt32(parent, "bitrate_nominal")
    yield UInt32(parent, "bitrate_minimum")
    yield Bits(parent, "blocksize_0", 4)
    yield Bits(parent, "blocksize_1", 4)
    yield UInt8(parent, "framing_flag")

class Chunk(FieldSet):
    tag_info = {
        "vorbis": {
            3: ("comment", parseVorbisComment),
            1: ("vorbis_hdr", parseVorbisHeader),
        }, "theora": {
            128: ("theora_hdr", parseTheoraHeader),
            129: ("comment", parseVorbisComment),
        }, "video\0": {
            1: ("video_hdr", parseVideoHeader),
        },
    }
    def __init__(self, *args, **kw):
        FieldSet.__init__(self, *args, **kw)
        if 7*8 <= self.size:
            try:
                self._name, self.parser = self.tag_info[self["codec"].value][self["type"].value]
                if self._name == "theora_hdr":
                    self.endian = BIG_ENDIAN
            except KeyError:
                self.parser = None
        else:
            self.parser = None

    def createFields(self):
        if 7*8 <= self.size:
            yield UInt8(self, 'type')
            yield String(self, 'codec', 6)
        if self.parser:
            for field in self.parser(self):
                yield field
        else:
            size = (self.size - self.current_size) // 8
            if size:
                yield RawBytes(self, "raw", size)

class Packets:
    def __init__(self, first):
        self.first = first

    def __iter__(self):
        fragment = self.first
        size = None
        while fragment is not None:
            page = fragment.parent
            continued_packet = page["continued_packet"].value
            for segment_size in page.segment_size:
                if continued_packet:
                    size += segment_size
                    continued_packet = False
                else:
                    if size:
                        yield size * 8
                    size = segment_size
            fragment = fragment.next
        if size:
            yield size * 8

class Segments(Fragment):
    def __init__(self, parent, *args, **kw):
        Fragment.__init__(self, parent, *args, **kw)
        if parent['last_page'].value:
            next = None
        else:
            next = self.createNext
        self.setLinks(parent.parent.streams.setdefault(parent['serial'].value, self), next)

    def _createInputStream(self, **args):
        if self.first is self:
            return FragmentedStream(self, packets=Packets(self), tags=[("id","ogg_stream")], **args)
        return Fragment._createInputStream(self, **args)

    def _getData(self):
        return self

    def createNext(self):
        parent = self.parent
        index = parent.index
        parent = parent.parent
        first = self.first
        try:
            while True:
                index += 1
                next = parent[index][self.name]
                if next.first is first:
                    return next
        except MissingField:
            pass

    def createFields(self):
        for segment_size in self.parent.segment_size:
            if segment_size:
                yield Chunk(self, "chunk[]", size=segment_size*8)

class OggPage(FieldSet):
    MAGIC = "OggS"

    def __init__(self, *args):
        FieldSet.__init__(self, *args)
        size = 27
        self.lacing_size = self['lacing_size'].value
        if self.lacing_size:
            size += self.lacing_size
            lacing = self['lacing']
            self.segment_size = [ field.value for field in lacing ]
            size += sum(self.segment_size)
        self._size = size * 8

    def createFields(self):
        yield String(self, 'capture_pattern', 4, charset="ASCII")
        if self['capture_pattern'].value != self.MAGIC:
            self.warning('Invalid signature. An Ogg page must start with "%s".' % self.MAGIC)
        yield UInt8(self, 'stream_structure_version')
        yield Bit(self, 'continued_packet')
        yield Bit(self, 'first_page')
        yield Bit(self, 'last_page')
        yield NullBits(self, 'unused', 5)
        yield UInt64(self, 'abs_granule_pos')
        yield textHandler(UInt32(self, 'serial'), hexadecimal)
        yield UInt32(self, 'page')
        yield textHandler(UInt32(self, 'checksum'), hexadecimal)
        yield UInt8(self, 'lacing_size')
        if self.lacing_size:
            yield Lacing(self, "lacing", size=self.lacing_size*8)
            yield Segments(self, "segments", size=self._size-self._current_size)

    def validate(self):
        if self['capture_pattern'].value != self.MAGIC:
            return "Wrong signature"
        if self['stream_structure_version'].value != 0:
            return "Unknown structure version (%s)" % self['stream_structure_version'].value
        return ""

class OggFile(Parser):
    PARSER_TAGS = {
        "id": "ogg",
        "category": "container",
        "file_ext": ("ogg", "ogm"),
        "mime": (
            u"application/ogg", u"application/x-ogg",
            u"audio/ogg", u"audio/x-ogg",
            u"video/ogg", u"video/x-ogg",
            u"video/theora", u"video/x-theora",
         ),
        "magic": ((OggPage.MAGIC, 0),),
        "subfile": "skip",
        "min_size": 28*8,
        "description": "Ogg multimedia container"
    }
    endian = LITTLE_ENDIAN

    def validate(self):
        magic = OggPage.MAGIC
        if self.stream.readBytes(0, len(magic)) != magic:
            return "Invalid magic string"
        # Validate first 3 pages
        for index in xrange(3):
            try:
                page = self[index]
            except MissingField:
                if self.done:
                    return True
                return "Unable to get page #%u" % index
            except (InputStreamError, ParserError):
                return "Unable to create page #%u" % index
            err = page.validate()
            if err:
                return "Invalid page #%s: %s" % (index, err)
        return True

    def createMimeType(self):
        if "theora_hdr" in self["page[0]/segments"]:
            return u"video/theora"
        elif "vorbis_hdr" in self["page[0]/segments"]:
            return u"audio/vorbis"
        else:
            return u"application/ogg"

    def createDescription(self):
        if "theora_hdr" in self["page[0]"]:
            return u"Ogg/Theora video"
        elif "vorbis_hdr" in self["page[0]"]:
            return u"Ogg/Vorbis audio"
        else:
            return u"Ogg multimedia container"

    def createFields(self):
        self.streams = {}
        while not self.eof:
            yield OggPage(self, "page[]")

    def createLastPage(self):
        start = self[0].size
        end = MAX_FILESIZE * 8
        if True:
            # FIXME: This doesn't work on all files (eg. some Ogg/Theora)
            offset = self.stream.searchBytes("OggS\0\5", start, end)
            if offset is None:
                offset = self.stream.searchBytes("OggS\0\4", start, end)
            if offset is None:
                return None
            return createOrphanField(self, offset, OggPage, "page")
        else:
            # Very slow version
            page = None
            while True:
                offset = self.stream.searchBytes("OggS\0", start, end)
                if offset is None:
                    break
                page = createOrphanField(self, offset, OggPage, "page")
                start += page.size
            return page

    def createContentSize(self):
        page = self.createLastPage()
        if page:
            return page.absolute_address + page.size
        else:
            return None


class OggStream(Parser):
    PARSER_TAGS = {
        "id": "ogg_stream",
        "category": "container",
        "subfile": "skip",
        "min_size": 7*8,
        "description": "Ogg logical stream"
    }
    endian = LITTLE_ENDIAN

    def validate(self):
        return False

    def createFields(self):
        for size in self.stream.packets:
            yield RawBytes(self, "packet[]", size//8)
hachoir-parser documentation built on Sept. 20, 2017, 5:30 p.m.