User:Chorus/genprotosets.py

From the Dyson Sphere Program Wiki
< User:Chorus
Revision as of 00:24, 27 April 2024 by Chorus (talk | contribs) (Add script for scraping select Dyson Sphere Program data.)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)

The following code requires Python 3, as well as pythonnet and UnityPy.

#!/usr/bin/env python3
'''
Recommend: genprotosets.py \
    $DSP \
    --locale $DSP/Locale/1033 \
    -e Item \
    -e Recipe \
    -e Tech \
    -e Theme \
    >protosets.json
'''

import io
import itertools as it
import json
import operator as op
import os
import struct
import sys

def main(gamedir, locale, indent, enable, disable):
    '''
    Load Dyson Sphere Program assembly, parse assets for MonoBehaviours,
    roll my own parsers from the .NET type definition, then export
    specified ProtoSets as JSON
    '''
    try:
        import clr
    except ImportError:
        sys.exit('please install pythonnet using \"pip install pythonnet [--user]\"')
    try:
        import UnityPy
    except ImportError:
        sys.exit('please install UnityPy using \"pip install UnityPy [--user]\"')
    if not os.path.exists(os.path.join(gamedir, 'DSPGAME_Data')):
        sys.exit('$DSP/DSPGAME_Data directory not found')
    dlldir = os.path.join(gamedir, 'DSPGAME_Data', 'Managed')
    sys.path.append(dlldir)
    assembly = clr.AddReference('Assembly-CSharp')
    result = dict()
    env = UnityPy.load(os.path.join(gamedir, 'DSPGAME_Data'))
    for obj in env.objects:
        if obj.type.name == 'MonoBehaviour':
            mb = obj.read()
            if mb.name.endswith('ProtoSet'):
                print(f'found MonoBehaviour "{mb.name}" in {mb.assets_file.name}', file=sys.stderr)
                if allowed(mb.name, enable, disable):
                    mbtype = assembly.GetType(mb.name)
                    parser = parserForType(mbtype)
                    reader = io.BytesIO(mb.raw_data)
                    result[mb.name] = parser(reader, locale)
                else:
                    print(f'skipping "{mb.name}"', file=sys.stderr)
    print(json.dumps(result, indent=indent))

def loadlocale(localedir):
    '''
    load translations from locale directory
    '''
    translate = dict()
    for mapfile in os.listdir(localedir):
        mappath = os.path.join(localedir, mapfile)
        with open(mappath, 'r', encoding='utf-16') as mapf:
            for line in mapf:
                orig, empty, number, match = line.strip('\n').split('\t')
                if orig in translate:
                    print(f'Duplicate locale key "{orig}" = "{match}" (keeping "{translate[orig]}") (from {mappath})', file=sys.stderr)
                translate[orig] = match
    return translate

def allowed(name, enabled, disabled):
    '''
    check if parsing a table is allowed according to enabled/disabled prefixes
    '''
    if enabled:
        return any(name.startswith(prefix) for prefix in enabled)
    else:
        return not any(name.startswith(prefix) for prefix in disabled)

def parserForType(ctype, _cache=dict()):
    '''
    recursively construct a parser for a given type
    '''
    from System.Reflection import BindingFlags
    if ctype.Name in predefined:
        return predefined[ctype.Name]
    elif ctype.Name in _cache:
        return _cache[ctype.Name]
    print(f'parser {ctype.Name}', file=sys.stderr)
    if ctype.IsArray:
        subparser = parserForType(ctype.GetElementType())
        ret = ArrayParser(subparser)
    elif ctype.IsEnum:
        ret = EnumParser(ctype)
    else:
        fields = ctype.GetFields(BindingFlags.Public | BindingFlags.Instance)
        fields = it.filterfalse(op.attrgetter('IsNotSerialized'), fields)
        fields = sorted(fields, key=op.attrgetter('MetadataToken'))
        fields = [(f.Name, parserForType(f.FieldType)) for f in fields]
        if not fields:
            raise Exception(f'Could not find fields for {ctype.Name}')
        ret = ObjectParser(fields)
    _cache[ctype.Name] = ret
    return ret

class ObjectParser(object):
    '''
    parse the fields of custom object
    '''
    def __init__(self, parsers):
        self.parsers = parsers

    def __call__(self, *args, **kwargs):
        res = {}
        for key, subparse in self.parsers:
            res[key] = subparse(*args, **kwargs)
        return res

class EnumParser(object):
    '''
    parse an Enum value into either a collection of values or a single
    value
    '''
    def __init__(self, enumtype):
        from System import Attribute, FlagsAttribute
        self.enumtype = enumtype
        self.isflag = Attribute.GetCustomAttribute(enumtype, FlagsAttribute) is not None

    def __call__(self, reader, translate):
        from System import Enum
        obj = intParser(reader, translate)
        eobj = Enum.ToObject(self.enumtype, obj)
        if self.isflag:
            return [
                str(v)
                for v in self.enumtype.GetEnumValues()
                if eobj.HasFlag(v)]
        elif Enum.IsDefined(self.enumtype, eobj):
            return str(eobj)
        else:
            return obj

class ArrayParser(object):
    '''
    parse an array of objects, starting with the length of the array
    '''
    def __init__(self, memberparser):
        self.memberparser = memberparser

    def __call__(self, reader, translate):
        quantity = struct.unpack('<i', reader.read(4))[0]
        return [self.memberparser(reader, translate) for i in range(quantity)]

def stringParser(reader, translate):
    '''
    parse a string from the datastream

    make sure it's aligned to 4 bytes when we're done
    '''
    length = struct.unpack('<i', reader.read(4))[0]
    fmt = f'<{length}s'
    if length % 4 != 0:
        pad = 4 - length % 4
        fmt += f'{pad}x'
        length += pad
    val = struct.unpack(fmt, reader.read(length))[0].decode()
    return translate.get(val, val)

def intParser(reader, translate):
    '''
    parse a 32-bit integer from the datastream
    '''
    return struct.unpack('<i', reader.read(4))[0]

def longParser(reader, translate):
    '''
    parse a 64-bit integer from the datastream
    '''
    return struct.unpack('<q', reader.read(8))[0]

def boolParser(reader, translate):
    '''
    parse a 4-byte aligned boolean from the datastream
    '''
    return struct.unpack('<?xxx', reader.read(4))[0]

def floatParser(reader, translate):
    '''
    parse a 32-bit float from the datastream
    '''
    return struct.unpack('<f', reader.read(4))[0]

def doubleParser(reader, translate):
    '''
    parse a 64-bit double from the datastream
    '''
    return struct.unpack('<d', reader.read(8))[0]

predefined = {
    'String': stringParser,
    'Int32': intParser,
    'Int64': longParser,
    'Boolean': boolParser,
    'Single': floatParser,
    'Double': doubleParser,
}

def parse_args(args=None):
    import argparse as ap
    parser = ap.ArgumentParser(description=__doc__)
    parser.add_argument('gamedir', help='Dyson Sphere Program directory ($DSP)')
    parser.add_argument(
        '-l', '--locale',
        type=loadlocale,
        help='Locale to translate strings (probably $DSP/Locale/1033)')
    parser.add_argument(
        '-i', '--indent',
        type=int,
        default=None,
        help='JSON indentation level (default=%(default)s)')
    parser.add_argument(
        '-d', '--disable',
        action='append',
        default=['Abnormality', 'Vege', 'Vein'],
        help='Disable certain ProtoSet tables')
    parser.add_argument(
        '-e', '--enable',
        action='append',
        help='Only enable specific ProtoSet tables')
    return vars(parser.parse_args(args))

if __name__ == '__main__':
    main(**parse_args())
🍪 We use cookies to keep session information to provide you a better experience.