#!/usr/bin/env python3
from collections import namedtuple
from itertools import groupby
from functools import partial
from warnings import warn
from os import SEEK_CUR
from io import BytesIO
# For 0.3 release
# TODO Improve diff or RLE algorithm - src = 1 2 1 2 1 2 -> dest = 1 1 1 1 1 1
RECORD_HEADER_SIZE = 5
RECORD_OFFSET_SIZE = 3
RECORD_SIZE_SIZE = 2
RECORD_RLE_DATA_SIZE = 1
MIN_PATCH = 14 # Header + Footer + Min_Record
MIN_RECORD = 6 # Minimum viable size of a record
MIN_COMPRESS = 4 # Compressing a size 4 record only saves 1 byte
MAX_UNPATCHED = 2**24 # 16 MiB, largest value we can offset to
MAX_RECORD_SIZE = 2**16-1 # Max value held in 2 bytes
[docs]class IpsRecord( namedtuple('IpsRecord', 'offset size rle_size data') ):
'''
Data container for one record of an IPS file.
:param offset: offset in first 3 bytes of the record, stored as int
:param size: size in the next 2 bytes, stored as int
:param rle_size: size in the next 2 bytes if previous was 0, stored as int
:param data: bytes object of data with length 'size' or 'rle_size'
'''
[docs] def last_byte(self):
'''
Calculate the last byte written to when this record is applied to a file.
:returns: offset of the last byte written to.
'''
return self.offset + self.size + self.rle_size
[docs] def inflate(self):
'''
Inflate the record if it is currently RLE compressed.
:returns: self or inflated :class:`IpsRecord`
'''
if not self.rle_size:
return self
return IpsRecord(self.offset, self.rle_size, 0, self.data*self.rle_size)
[docs] def compress(self):
'''
Attempts to RLE compress the record into a single, smaller, record. May
split the record to multiple.
:returns: List of: self (no compression) or compressed :class:`IpsRecord`
'''
if not self.size or len(self.data) < MIN_COMPRESS or\
not any(i>=MIN_COMPRESS for i in [len(list(g)) for _,g in groupby(self.data)]):
return [self]
if len([len(list(g)) for _,g in groupby(self.data)]) == 1:
return [IpsRecord(self.offset, 0, len(self.data), self.data[0])]
offset, run, rle = 0, b'', []
for d,g in groupby(self.data):
size = len(list(g))
if size >= MIN_COMPRESS:
totaloff = self.offset+offset
if run:
rle.append(IpsRecord(totaloff-len(run), len(run), 0, run))
run = b''
rle.append(IpsRecord( totaloff, 0, size, bytes([d])))
else:
run += (bytes([d])*size)
offset += size
if run:
rle.append(IpsRecord(self.offset+offset-len(run), len(run), 0, run))
return rle
def flatten(self):
base = (self.offset).to_bytes(RECORD_OFFSET_SIZE, byteorder='big') +\
(self.size).to_bytes(RECORD_SIZE_SIZE, byteorder='big')
if self.size:
return base + self.data
return base + (self.rle_size).to_bytes(RECORD_SIZE_SIZE, byteorder='big') + self.data
[docs]class IpsyError(Exception):
'''
Logged by :func:`ips_read` when IPS corruption is found.
'''
pass
[docs]def write( fhpatch, records ):
'''
Writes out a list of :class:`IpsRecord` to a file
:param fhpatch: File handler of the new patch file
:param records: List of :class:`IpsRecord`
'''
fhpatch.write(b"PATCH")
for r in records:
fhpatch.write( r.flatten() )
fhpatch.write(b"EOF")
[docs]def read( fhpatch, EOFcontinue=False ):
'''
Read in an IPS file to a list of :class:`IpsRecord`
:param fhpatch: File handler for IPS patch
:param EOFcontinue: Continue processing until the real EOF
is found (last 3 bytes of file)
:returns: List of :class:`IpsRecord`
'''
records = []
if fhpatch.read(RECORD_HEADER_SIZE) != b"PATCH":
raise IpsyError(
"IPS file missing header")
for offset in iter(partial(fhpatch.read, RECORD_OFFSET_SIZE), b''):
if offset == b'EOF':
if EOFcontinue:
if len(fhpatch.read(MIN_RECORD)) != MIN_RECORD:
break
fhpatch.seek(-MIN_RECORD, SEEK_CUR)
else:
break
size = fhpatch.read(RECORD_SIZE_SIZE)
if (len(offset) != RECORD_OFFSET_SIZE) or (len(size) != RECORD_SIZE_SIZE):
raise IpsyError(
"IPS file unexpectedly ended")
offset = int.from_bytes( offset, byteorder='big' )
size = int.from_bytes( size, byteorder='big' )
if size == 0:
size = fhpatch.read(RECORD_SIZE_SIZE)
size = int.from_bytes( size, byteorder='big' )
if size == 0:
warn("IPS file has record with both 0 size and 0 RLE size." + \
"Continuing to next record.")
continue
data = fhpatch.read(RECORD_RLE_DATA_SIZE)
if data == b'':
raise IpsyError(
"IPS file unexpectedly ended")
records.append(IpsRecord(offset, 0, size, data))
else:
data = fhpatch.read(size)
if len(data) != size:
raise IpsyError(
"IPS file unexpectedly ended")
records.append(IpsRecord(offset, size, 0, data))
if fhpatch.read(1) != b'':
warn("Data after EOF in IPS file. Truncating.")
return records
[docs]def merge( fhpatch, *fhpatches, path_dst=None ):
'''
Turns several IPS patches into one larger patch.
The order that the patches are applied in is preserved.
If the destination file is provided then further
simplifications can be made.
:param fhpatch: File Handler for resulting IPS file
:param fhpatches: list of File Handlers for IPS files to
merge
:param path_dst: Path to file that these patches are
intended to be used on.
'''
records = [i for s in map(lambda fh:read( fh, EOFcontinue=True ), fhpatches[:-1]) for i in s]
if path_dst:
records = cleanup_records( records, path_dst )
write( fhpatch, records )
[docs]def cleanup_records( ips_records, path_dst ):
'''
Removes useless records and combines records when possible.
:param ips_records: List of :class:`IpsRecord`
:param path_dst: Path to file that these patches are intended
to be used on.
:returns: List of :class:`IpsRecord`, simplified where
possible.
'''
with open(path_dst, 'r') as fh:
dstfh = BytesIO(fh.read())
srcfh = BytesIO(dstfh.read())
patch_from_records( dstfh, ips_records )
return diff( srcfh, dstfh, fhpatch=None, rle=True )
[docs]def rle_compress( records ):
'''
Attempt to RLE compress a collection of IPS records.
:param records: List of :class:`IpsRecord` to compress
:returns: RLE compressed list of :class:`IpsRecord`
'''
# TODO Improve this by compresing RLE records that sandwich a run of the RLE
# data. Might be more trouble than its worth.
return [i for s in map(lambda r:r.compress(),records) for i in s]
[docs]def diff( fhsrc, fhdst, fhpatch=None, rle=False ):
'''
Diff two files, attempt RLE compression, and write the IPS patch to a file.
Assumes both files are the same size.
:param fhsrc: File handler of orignal file
:param fhdst: File handler of the patched file
:param fhpatch: File handler for IPS file
:param rle: True if RLE compression should be used
:returns: List of :class:`IpsRecord` that were written to the file.
'''
records, patch_bytes = [], b''
for src_byte in iter(partial(fhsrc.read, 1), b''):
dst_byte = fhdst.read(1)
s = len(patch_bytes)
if (src_byte == dst_byte) or (s == MAX_RECORD_SIZE):
if s != 0:
offset = fhdst.tell()-s-1
if offset.to_bytes(RECORD_OFFSET_SIZE, byteorder='big') == b'EOF':
offset, s = offset-1, s+1
fhsrc.seek(offset)
patch_bytes = fhsrc.read(s)
records.append(IpsRecord(fhdst.tell()-s-1, s, 0, patch_bytes[:]))
patch_bytes = b''
else:
patch_bytes += dst_byte
s = len(patch_bytes)
if s != 0:
records.append(IpsRecord(fhdst.tell()-s, s, 0, patch_bytes[:]))
if len(records) == 0:
warn("No differences found in files")
if rle:
records = rle_compress( records )
if fhpatch:
write( fhpatch, records )
return records
[docs]def patch_from_records( fhdest, records ):
'''
Apply an list of :class:`IpsRecord` to a file. Destructive processes.
:param fhdest: File handler to-be-patched
:param fhpatch: File handler of the patch
:returns: Number of records applied by the patch
'''
for r in records:
fhdest.seek(r.offset)
fhdest.write(r.inflate().data)
return len(records)
[docs]def patch( fhdest, fhpatch, EOFcontinue=False ):
'''
Apply an IPS patch to a file. Destructive processes.
:param fhdest: File handler to-be-patched
:param fhpatch: File handler of the patch
:param EOFcontinue: Continue processing until the real EOF
is found (last 3 bytes of file)
:returns: Number of records applied by the patch
'''
return patch_from_records( fhdest, read( fhpatch, EOFcontinue ) )
#from zlib import crc32
#from shutil import copyfileobj
#from os import SEEK_END, SEEK_SET
#from tempfile import TemporaryFile
#
#class UPS:
#
# RECORD_HEADER_SIZE = 4
#
# class UpsRecord( namedtuple('UpsRecord', 'skip xor') ):
# pass
#
# def crc_check( fhpatch, fhsrc, fhdst ):
# crcSrc, crcDst = crc32(fhsrc), crc32(fhdst)
# crcPat = crc32(fhpatch, ignorelast32=True)
# fhpatch.seek(-12, SEEK_END)
# fcrcSrc, fcrcDst, fcrcPat = list(iter(partial(fhpatch.read, 4), b''))
# # Do compare
#
# def read( fhpatch ):
# if fhpatch.read(RECORD_HEADER_SIZE) != b"UPS1":
# raise IpsyError(
# "UPS file missing header")
# fhpatch.seek(-12, SEEK_END)
# list(iter(partial(fhpatch.read, 4), b''))
# ## OTher stuff
#
# def varaible_int_read( fhpatch ):
# result, shift = 0, 0
# for chunk in iter(partial(fhpatch.read, 1), b''):
# chunk = int.from_bytes( chunk, byteorder='big' )
# if chunk & 0x80:
# result += (chunk & 0x7f) << shift
# break
# result += (chunk | 0x80) << shift
# shift += 7
# return result
#
# def crc32( fh, ignorelast32=False ):
# fh = strip_crc32(fh) if ignorelast32 else fh
# out = 0
# for chunk in iter(partial(fh.read, 16384), b''):
# out = zlib.crc32(chunk, out)
# return out & 0xFFFFFFFF
#
# # This is a bad idea for big files
# def strip_crc32( fh ):
# copyfh = TemporaryFile()
# copyfileobj(fh, copyfh, size)
# copyfh.seek(-4, SEEK_END)
# copyfh.truncate()