#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This module contains the base classes used
when defining mutation strategies for pfp
"""
import contextlib
import glob
import os
import six
from pfp.fields import BitfieldRW, NumberBase
from pfp.bitwrap import BitwrappedStream
from pfp.utils import timeit
get_strategy = None
StratGroup = None
FieldStrat = None
[docs]class Changer(object):
"""
"""
def __init__(self, orig_data):
self._orig_data = bytearray(orig_data)
self._change_set_stack = []
[docs] @contextlib.contextmanager
def change(self, field_set):
"""Intended to be used with a ``with`` block. Takes care of pushing
and popping the changes, yields the modified data.
"""
self.push_changes(field_set)
try:
modified_data = self.build()
yield modified_data
finally:
self.pop_changes()
[docs] def push_changes(self, field_set):
"""Push a new changeset onto the changeset stack for the provided
set of fields.
"""
new_change_set = []
new_data = []
for field in field_set:
offset = field._pfp__offset
if isinstance(field, NumberBase) and field.bitsize is not None:
new_data = self._handle_bitfield(field)
else:
new_data = field._pfp__build()
new_change_set.append((offset, new_data))
self._change_set_stack.append(new_change_set)
[docs] def pop_changes(self):
"""Return a version of the original data after popping the latest
"""
self._change_set_stack.pop()
[docs] def build(self):
"""Apply all changesets to the original data
"""
new_data = bytearray(self._orig_data)
for change_set in self._change_set_stack:
for offset, new_field_data in change_set:
new_data[offset:offset+len(new_field_data)] = new_field_data
return new_data
def _handle_bitfield(self, field):
"""Find the field's first evenly-aligned previous sibling that is
also a bitfield, as well as all subsequent siblings until a full
bit "class" is reached. Build the entire set of bitfields as a group.
E.g.:
ushort a:1;
ushort b:3;
ushort c:10;
ushort d:2;
This entire group should be built
"""
total_bits = field.width * 8;
bit_offset = lambda x: total_bits - x._pfp__offset_bits
fields_to_build = []
curr_field = field._pfp__prev_sibling
# previous siblings
while curr_field is not None and bit_offset(curr_field) >= 0:
fields_to_build.append(curr_field)
curr_field = curr_field._pfp__prev_sibling
fields_to_build = list(reversed(fields_to_build))
fields_to_build.append(field)
# next siblings
curr_field = field._pfp__next_sibling
while (curr_field is not None
and isinstance(curr_field, field.__class__)
and curr_field.bitsize is not None):
if bit_offset(curr_field) + curr_field.bitsize > total_bits:
break
fields_to_build.append(curr_field)
curr_field = curr_field._pfp__next_sibling
core_stream = six.BytesIO(b"")
bit_stream = BitwrappedStream(core_stream)
bitfield_rw = BitfieldRW(None, field.__class__)
bitfield_rw.reserved_bits = field.bitfield_rw.reserved_bits
for to_build in fields_to_build:
old_bitfield_rw = to_build.bitfield_rw
to_build.bitfield_rw = bitfield_rw
to_build._pfp__build(bit_stream)
to_build.bitfield_rw = old_bitfield_rw
return core_stream.getvalue()
def init():
global get_strategy
global StratGroup
global FieldStrat
import pfp.fuzz.strats
get_strategy = pfp.fuzz.strats.get_strategy
StratGroup = pfp.fuzz.strats.StratGroup
FieldStrat = pfp.fuzz.strats.FieldStrat
# load all of the built-in strategies
for strat_file in glob.glob(
os.path.join(os.path.dirname(__file__), "*.py")
):
filename = os.path.basename(strat_file)
if filename in ["__init__.py", "base.py"]:
continue
mod_name = filename.replace(".py", "").replace(".pyc", "")
__import__("pfp.fuzz." + mod_name)
[docs]def changeset_mutate(field, strat_name_or_cls, num=100, at_once=1, yield_changed=False, fields_to_modify=None, base_data=None):
"""Mutate the provided field (probably a Dom or struct instance) using the
strategy specified with ``strat_name_or_class``, yielding ``num`` mutations
that affect up to ``at_once`` fields at once.
This function will yield back the field after each mutation, optionally
also yielding a ``set`` of fields that were mutated in that iteration (if ``yield_changed`` is
``True``). It should also be noted that the yielded set of changed fields *can*
be modified and is no longer needed by the mutate() function.
:param pfp.fields.Field field: The field to mutate (can be anything, not just Dom/Structs)
:param strat_name_or_class: Can be the name of a strategy, or the actual strategy class (not an instance)
:param int num: The number of mutations to yield
:param int at_once: The number of fields to mutate at once
:param bool yield_changed: Yield a list of fields changed along with the mutated dom
:param bool use_changesets: If a performance optimization should be used that builds the full
output once, and then replaced only the changed fields, including watchers, etc. **NOTE**
this does not yet work fully with packed structures (https://pfp.readthedocs.io/en/latest/metadata.html#packer-metadata)
:returns: generator
"""
import pfp.fuzz.rand as rand
init()
strat = get_strategy(strat_name_or_cls)
if fields_to_modify is not None:
to_mutate = fields_to_modify
else:
to_mutate = strat.which(field)
with_strats = []
for to_mutate_field in to_mutate:
field_strat = strat.get_field_strat(to_mutate_field)
if field_strat is not None:
with_strats.append((to_mutate_field, field_strat))
# we don't need these ones anymore
del to_mutate
# build it once at the beginning
if base_data is not None:
changer = Changer(base_data)
else:
changer = Changer(field._pfp__build())
count = 0
for x in six.moves.range(num):
try:
idx_pool = set([x for x in six.moves.xrange(len(with_strats))])
modified_fields = []
# modify `at_once` number of fields OR len(with_strats) number of fields,
# whichever is lower
count = 0
for at_onces in six.moves.xrange(min(len(with_strats), at_once)):
count += 1
# we'll never pull the same idx from idx_pool more than once
# since we're removing the idx after choosing it
rand_idx = rand.sample(idx_pool, 1)[0]
idx_pool.remove(rand_idx)
rand_field, field_strat = with_strats[rand_idx]
rand_field._pfp__snapshot()
mutated_fields = field_strat.mutate(rand_field)
modified_fields.append(rand_field)
modified_fields += mutated_fields
with changer.change(modified_fields) as modified_data:
if yield_changed:
yield modified_data, modified_fields
else:
yield modified_data
finally:
for rand_field in modified_fields:
rand_field._pfp__restore_snapshot()
[docs]def mutate(field, strat_name_or_cls, num=100, at_once=1, yield_changed=False):
"""Mutate the provided field (probably a Dom or struct instance) using the
strategy specified with ``strat_name_or_class``, yielding ``num`` mutations
that affect up to ``at_once`` fields at once.
This function will yield back the field after each mutation, optionally
also yielding a ``set`` of fields that were mutated in that iteration (if ``yield_changed`` is
``True``). It should also be noted that the yielded set of changed fields *can*
be modified and is no longer needed by the mutate() function.
:param pfp.fields.Field field: The field to mutate (can be anything, not just Dom/Structs)
:param strat_name_or_class: Can be the name of a strategy, or the actual strategy class (not an instance)
:param int num: The number of mutations to yield
:param int at_once: The number of fields to mutate at once
:param bool yield_changed: Yield a list of fields changed along with the mutated dom
:returns: generator
"""
import pfp.fuzz.rand as rand
init()
strat = get_strategy(strat_name_or_cls)
to_mutate = strat.which(field)
with_strats = []
for to_mutate_field in to_mutate:
field_strat = strat.get_field_strat(to_mutate_field)
if field_strat is not None:
with_strats.append((to_mutate_field, field_strat))
# we don't need these ones anymore
del to_mutate
count = 0
for x in six.moves.range(num):
# save the current value of all subfields without
# triggering events
field._pfp__snapshot(recurse=True)
try:
chosen_fields = set()
idx_pool = set([x for x in six.moves.xrange(len(with_strats))])
# modify `at_once` number of fields OR len(with_strats) number of fields,
# whichever is lower
for at_onces in six.moves.xrange(min(len(with_strats), at_once)):
# we'll never pull the same idx from idx_pool more than once
# since we're removing the idx after choosing it
rand_idx = rand.sample(idx_pool, 1)[0]
idx_pool.remove(rand_idx)
rand_field, field_strat = with_strats[rand_idx]
chosen_fields.add(rand_field)
field_strat.mutate(rand_field)
if yield_changed:
yield field, chosen_fields
else:
# yield back the original field
yield field
finally:
# restore the saved value of all subfields without
# triggering events
field._pfp__restore_snapshot(recurse=True)