Source code for ipsy

#!/usr/bin/env python3

from collections import namedtuple, deque
from os import SEEK_CUR, remove
from itertools import groupby
from functools import partial
from shutil import copyfile
from warnings import warn
from uuid import uuid4

# For 0.2 release
# TODO EOF checking is only in data segment right now, check elsewhere too

# For 0.3 release
# TODO Improve diff or RLE algorithm - src = 1 2 1 2 1 2 -> dest = 1 1 1 1 1 1

RECORD_HEADER_SIZE = 5
RECORD_OFFSET_SIZE = 3
RECORD_SIZE_SIZE = 2
RECORD_RLE_DATA_SIZE = 1
MIN_PATCH = 14            # Header + Footer + Min_Record
MIN_RECORD = 6            # Minimum viable size of a record
MIN_COMPRESS = 4          # Compressing a size 4 record only saves 1 byte
MAX_UNPATCHED = 2**24     # 16 MiB, largest value we can offset to
MAX_RECORD_SIZE = 2**16-1 # Max value held in 2 bytes

[docs]class IpsRecord( namedtuple('IpsRecord', 'offset size rle_size data') ): ''' Data container for one record of an IPS file. :param offset: offset in first 3 bytes of the record, stored as int :param size: size in the next 2 bytes, stored as int :param rle_size: size in the next 2 bytes if previous was 0, stored as int :param data: bytes object of data with length 'size' or 'rle_size' '''
[docs] def last_byte(self): ''' Calculate the last byte written to when this record is applied to a file. :returns: offset of the last byte written to. ''' return self.offset + self.size + self.rle_size
[docs] def inflate(self): ''' Inflate the record if it is currently RLE compressed. :returns: self or inflated :class:`IpsRecord` ''' if not self.rle_size: return self return IpsRecord(self.offset, self.rle_size, 0, self.data*self.rle_size)
[docs] def compress(self): ''' Attempts to RLE compress the record into a single, smaller, record. Makes No attempt to spilt the record up into multiple. :returns: self or compressed :class:`IpsRecord` ''' if not self.size: return self if len(self.data) > MIN_COMPRESS and \ len([len(list(g)) for _,g in groupby(self.data)]) == 1: return IpsRecord(self.offset, 0, len(self.data), self.data[:1]) return self
def flatten(self): # TODO return b'' string to help check for EOF pass
[docs]class IpsyError(Exception): ''' Logged by :func:`ips_read` when IPS corruption is found. ''' pass
[docs]def ips_write( fhpatch, records ): ''' Writes out a list of :class:`IpsRecord` to a file :param fhpatch: File handler of the new patch file :param records: List of :class:`IpsRecord` ''' fhpatch.write(b"PATCH") for r in records: fhpatch.write( (r.offset).to_bytes(RECORD_OFFSET_SIZE, byteorder='big') ) if r.size: fhpatch.write( (r.size).to_bytes(RECORD_SIZE_SIZE, byteorder='big') ) else: fhpatch.write( b'\00\00' ) if r.rle_size: fhpatch.write( (r.rle_size).to_bytes(RECORD_SIZE_SIZE, byteorder='big') ) fhpatch.write( r.data ) fhpatch.write(b"EOF")
[docs]def ips_read( fhpatch, EOFcontinue=False ): ''' Read in an IPS file to a list of :class:`IpsRecord` :param fhpatch: File handler for IPS patch :param EOFcontinue: Continue processing until the real EOF is found (last 3 bytes of file) :returns: List of :class:`IpsRecord` ''' records = [] if fhpatch.read(RECORD_HEADER_SIZE) != b"PATCH": raise IpsyError( "IPS file missing header") for offset in iter(partial(fhpatch.read, RECORD_OFFSET_SIZE), b''): if offset == b'EOF': if EOFcontinue: if len(fhpatch.read(MIN_RECORD)) != MIN_RECORD: break fhpatch.seek(-MIN_RECORD, SEEK_CUR) else: break size = fhpatch.read(RECORD_SIZE_SIZE) if (len(offset) != RECORD_OFFSET_SIZE) or (len(size) != RECORD_SIZE_SIZE): raise IpsyError( "IPS file unexpectedly ended") offset = int.from_bytes( offset, byteorder='big' ) size = int.from_bytes( size, byteorder='big' ) if size == 0: size = fhpatch.read(RECORD_SIZE_SIZE) size = int.from_bytes( size, byteorder='big' ) if size == 0: warn("IPS file has record with both 0 size and 0 RLE size." + \ "Continuing to next record.") continue data = fhpatch.read(RECORD_RLE_DATA_SIZE) if data == b'': raise IpsyError( "IPS file unexpectedly ended") records.append( IpsRecord(offset, 0, size, data) ) else: data = fhpatch.read(size) if len(data) != size: raise IpsyError( "IPS file unexpectedly ended") records.append( IpsRecord(offset, size, 0, data) ) if fhpatch.read(1) != b'': warn("Data after EOF in IPS file. Truncating.") return records
[docs]def merge( fhpatch, *fhpatches, path_dst=None ): ''' Turns several IPS patches into one larger patch. The order that the patches are applied in is preserved. If the destination file is provided then further simplifications can be made. :param fhpatch: File Handler for resulting IPS file :param fhpatches: list of File Handlers for IPS files to merge :param path_dst: Path to file that these pathes are inteded to be used on. ''' records, fhpatches = [], fhpatches[:-1] for fh in fhpatches: records += ips_read( fh, EOFcontinue=True ) if path_dst: records = cleanup_records( records, path_dst ) ips_write( fhpatch, records )
[docs]def cleanup_records( ips_records, path_dst ): ''' Removes useless records and cobines records when possible. This function creates and deletes two temp files in the calling directory. :param ips_records: List of :class:`IpsRecord` :param path_dst: Path to file that these pathes are inteded to be used on. :returns: List of :class:`IpsRecord`, simplified where possible. ''' rom_size = max([record.last_byte() for record in ips_records]) src_name, dst_name = str(uuid4()), str(uuid4()) copyfile(path_dst, src_name) copyfile(path_dst, dst_name) with open(dst_name, 'wb') as fh: patch_from_records( fh, ips_records ) with open(src_name, 'rb') as fhsrc: with open(dst_name, 'rb') as fhdst: clean_ips = diff( file_src, file_dst, fhpatch=None, rle=True ) remove(src_name) remove(dst_name) return clean_ips
[docs]def rle_compress( records ): ''' Attempt to RLE compress a collection of IPS records. :param records: List of :class:`IpsRecord` to compress :returns: RLE compressed list of :class:`IpsRecord` ''' rle = [] for r in records: r = r.compress() if r.rle_size or \ (r.size < MIN_COMPRESS) or \ (not any([len(list(g)) >= MIN_COMPRESS for _,g in groupby(r.data)])): rle.append( r ) continue offset, run = 0, b'' for d,g in groupby(r.data): size = len(list(g)) if size >= MIN_COMPRESS: totaloff = r.offset+offset if run: rle.append( IpsRecord(totaloff-len(run), len(run), 0, run) ) run = b'' rle.append( IpsRecord( totaloff, 0, size, bytes([d])) ) else: run += (bytes([d])*size) offset += size if run: rle.append( IpsRecord(r.offset+offset-len(run), len(run), 0, run) ) return rle
[docs]def eof_check( fhpatch ): ''' Reviews an IPS patch to insure it has only one EOF marker. :param fhpatch: File handler of IPS patch :returns: True if exactly one marker, else False ''' check, counter, i = deque(maxlen=3), 0, 0 for val in iter(partial(fhpatch.read, 1), b''): check.append(val) if b''.join(check) == b'EOF': counter += 1 return (counter == 1)
[docs]def diff( fhsrc, fhdst, fhpatch=None, rle=False ): ''' Diff two files, attempt RLE compression, and write the IPS patch to a file. Assumes both files are the same size. :param fhsrc: File handler of orignal file :param fhdst: File handler of the patched file :param fhpatch: File handler for IPS file :param rle: True if RLE compression should be used ''' records, patch_bytes = [], b'' for src_byte in iter(partial(fhsrc.read, 1), b''): dst_byte = fhdst.read(1) s = len(patch_bytes) if (src_byte == dst_byte) or (s == MAX_RECORD_SIZE): if s != 0: offset = fhdst.tell()-s-1 if offset.to_bytes(RECORD_OFFSET_SIZE, byteorder='big') == b'EOF': offset, s = offset-1, s+1 fhsrc.seek(offset) patch_bytes = fhsrc.read(s) records.append( IpsRecord(fhdst.tell()-s-1, s, 0, patch_bytes[:]) ) patch_bytes = b'' else: patch_bytes += dst_byte s = len(patch_bytes) if s != 0: records.append( IpsRecord(fhdst.tell()-s, s, 0, patch_bytes[:]) ) if len(records) == 0: warn("No differances found in files") elif rle: records = rle_compress( records ) if fhpatch: ips_write( fhpatch, records ) return records
[docs]def patch_from_records( fhdest, records ): ''' Apply an list of :class:`IpsRecord` a file. Destructive processes. :param fhdest: File handler to-be-patched :param fhpatch: File handler of the patch :returns: Number of records applied by the patch ''' for r in records: fhdest.seek(r.offset) fhdest.write(r.inflate().data) return len(records)
[docs]def patch( fhdest, fhpatch, EOFcontinue=False ): ''' Apply an IPS patch to a file. Destructive processes. :param fhdest: File handler to-be-patched :param fhpatch: File handler of the patch :param EOFcontinue: Continue processing until the real EOF is found (last 3 bytes of file) :returns: Number of records applied by the patch ''' records = ips_read( fhpatch, EOFcontinue ) return patch_from_records( fhdest, records )