#!/usr/bin/env python
#
# File: afl-cov
#
# Version: 0.5
#
# Purpose: Perform lcov coverage diff's against each AFL queue file to see
# new functions and line coverage evolve from an AFL fuzzing cycle.
#
# Copyright (C) 2015 Michael Rash (mbr@cipherdyne.org)
#
# License (GNU General Public License):
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02111-1301,
# USA
#
from shutil import rmtree
from sys import argv
import errno
import re
import glob
import string
import argparse
import time
import signal
import sys, os
try:
import subprocess32 as subprocess
except ImportError:
import subprocess
__version__ = '0.5'
WANT_OUTPUT = 1
NO_OUTPUT = 0
def main():
exit_success = 0
exit_failure = 1
cargs = parse_cmdline()
if cargs.version:
print "afl-cov-" + __version__
return exit_success
if cargs.stop_afl:
return not stop_afl(cargs)
if not validate_args(cargs):
return exit_failure
if cargs.func_search or cargs.line_search:
return not search_cov(cargs)
if cargs.background:
run_in_background()
if cargs.live:
is_afl_running(cargs)
return not process_afl_test_cases(cargs)
def run_in_background():
### could use the python 'daemon' module, but it isn't always
### installed, and we just need a basic backgrounding
### capability anyway
pid = os.fork()
if (pid < 0):
print "[*] fork() error, exiting."
os._exit()
elif (pid > 0):
os._exit(0)
else:
os.setsid()
return
def process_afl_test_cases(cargs):
rv = True
run_once = False
tot_files = 0
fuzz_dir = ''
afl_files = []
cov_paths = {}
curr_file = ''
curr_lcov_base = ''
curr_lcov_info = ''
curr_lcov_info_final = ''
### main coverage tracking dictionary
cov = {}
cov['zero'] = {}
cov['pos'] = {}
while True:
if not import_fuzzing_dirs(cov_paths, cargs):
rv = False
break
dir_ctr = 0
for fuzz_dir in cov_paths['dirs']:
num_files = 0
new_files = []
tmp_files = import_test_cases(fuzz_dir + '/queue')
dir_ctr += 1
for f in tmp_files:
if f not in afl_files:
afl_files.append(f)
new_files.append(f)
if new_files:
logr("\n*** Imported %d new test cases from: %s\n" \
% (len(new_files), (fuzz_dir + '/queue')),
cov_paths['log_file'], cargs)
for f in new_files:
out_lines = []
curr_cycle = get_cycle_num(num_files, cargs)
logr("[+] AFL test case: %s (%d / %d), cycle: %d" \
% (os.path.basename(f), num_files, len(afl_files),
curr_cycle), cov_paths['log_file'], cargs)
gen_paths(fuzz_dir, cov_paths, f, cargs)
if dir_ctr > 1 and curr_file \
and not cov_paths['dirs'][fuzz_dir]['prev_file']:
cov_paths['dirs'][fuzz_dir]['prev_file'] = curr_file
cov_paths['dirs'][fuzz_dir]['prev_lcov_base'] = curr_lcov_base
cov_paths['dirs'][fuzz_dir]['prev_lcov_info'] = curr_lcov_info
cov_paths['dirs'][fuzz_dir]['prev_lcov_info_final'] = curr_lcov_info_final
if cargs.coverage_cmd:
### execute the command to generate code coverage stats
### for the current AFL test case file
if run_once:
run_cmd(cargs.coverage_cmd.replace('AFL_FILE', f),
cov_paths, cargs, NO_OUTPUT)
else:
out_lines = run_cmd(cargs.coverage_cmd.replace('AFL_FILE', f),
cov_paths, cargs, WANT_OUTPUT)
run_once = True
### generate the code coverage stats for this test case
gen_coverage(fuzz_dir, cov_paths, cargs)
### diff to the previous code coverage, look for new
### lines/functions, and write out results
if cov_paths['dirs'][fuzz_dir]['prev_file']:
coverage_diff(curr_cycle, fuzz_dir, cov_paths,
f, cov, cargs)
if not cargs.disable_lcov_web and cargs.lcov_web_all:
gen_web_cov_report(fuzz_dir, cov_paths, cargs)
### log the output of the very first coverage command to
### assist in troubleshooting
if len(out_lines):
logr("\n\n++++++ BEGIN - first exec output for CMD: %s" % \
(cargs.coverage_cmd.replace('AFL_FILE', f)),
cov_paths['log_file'], cargs)
for line in out_lines:
logr(" %s" % (line), cov_paths['log_file'], cargs)
logr("++++++ END\n", cov_paths['log_file'], cargs)
if dir_ctr == 1:
curr_file = f
curr_lcov_base = cov_paths['dirs'][fuzz_dir]['lcov_base']
curr_lcov_info = cov_paths['dirs'][fuzz_dir]['lcov_info']
curr_lcov_info_final = cov_paths['dirs'][fuzz_dir]['lcov_info_final']
cov_paths['dirs'][fuzz_dir]['prev_file'] = f
num_files += 1
tot_files += 1
if cargs.afl_queue_id_limit \
and num_files > cargs.afl_queue_id_limit:
logr("[+] queue/ id limit of %d reached..." \
% cargs.afl_queue_id_limit,
cov_paths['log_file'], cargs)
break
if cargs.live:
if is_afl_fuzz_running(cargs):
if not len(new_files):
logr("[-] No new AFL test cases, sleeping for %d seconds" \
% cargs.sleep, cov_paths['log_file'], cargs)
time.sleep(cargs.sleep)
continue
else:
logr("[+] afl-fuzz appears to be stopped...",
cov_paths['log_file'], cargs)
break
else:
break
if tot_files > 0:
logr("[+] Processed %d / %d test cases.\n" \
% (tot_files, len(afl_files)),
cov_paths['log_file'], cargs)
### write out the final zero coverage and positive coverage reports
write_zero_cov(cov['zero'], cov_paths, cargs)
write_pos_cov(cov['pos'], cov_paths, cargs)
if not cargs.disable_lcov_web:
gen_web_cov_report(fuzz_dir, cov_paths, cargs)
else:
if rv:
logr("[*] Did not find any AFL test cases, exiting.\n",
cov_paths['log_file'], cargs)
rv = False
return rv
def coverage_diff(cycle_num, fuzz_dir, cov_paths, afl_file, cov, cargs):
log_lines = []
delta_log_lines = []
print_diff_header = 1
cp = cov_paths['dirs'][fuzz_dir]
a_file = os.path.basename(cp['prev_file'])
a_dir = os.path.basename(os.path.dirname(cp['prev_lcov_info_final']))
b_file = os.path.basename(afl_file)
b_dir = os.path.basename(fuzz_dir)
### with the coverage from the previous lcov results extracted
### the previous time we went through this function, we remove
### associated files unless instructed to keep them
if not cargs.preserve_all_lcov_files:
rm_prev_cov_files(cp)
new_cov = extract_coverage(cp['lcov_info_final'], cargs)
### We aren't interested in the number of times AFL has executed
### a line or function (since we can't really get this anyway because
### gcov stats aren't influenced by AFL directly) - what we want is
### simply whether a new line or function has been executed by this
### test case. So, we look for new positive coverage.
for f in new_cov['pos']:
print_filename = 1
if f not in cov['zero'] and f not in cov['pos']: ### completely new file
cov_init(f, cov)
if print_diff_header:
log_lines.append("diff %s/%s -> %s/%s" % \
(a_dir, a_file, b_dir, b_file))
print_diff_header = 0
for ctype in new_cov['pos'][f]:
for val in sorted(new_cov['pos'][f][ctype]):
cov['pos'][f][ctype][val] = ''
if print_filename:
log_lines.append("New src file: " + f)
print_filename = 0
log_lines.append(" New '" + ctype + "' coverage: " + val)
if ctype == 'line':
if cargs.coverage_include_lines:
delta_log_lines.append("%s, %s, %s, %s, %s\n" \
% (cp['id_file'], cycle_num, f, ctype, val))
else:
delta_log_lines.append("%s, %s, %s, %s, %s\n" \
% (cp['id_file'], cycle_num, f, ctype, val))
elif f in cov['zero'] and f in cov['pos']:
for ctype in new_cov['pos'][f]:
for val in sorted(new_cov['pos'][f][ctype]):
if val not in cov['pos'][f][ctype]:
cov['pos'][f][ctype][val] = ''
if print_diff_header:
log_lines.append("diff %s/%s -> %s/%s" % \
(a_dir, a_file, b_dir, b_file))
print_diff_header = 0
if print_filename:
log_lines.append("Src file: " + f)
print_filename = 0
log_lines.append(" New '" + ctype + "' coverage: " + val)
if ctype == 'line':
if cargs.coverage_include_lines:
delta_log_lines.append("%s, %s, %s, %s, %s\n" \
% (cp['id_file'], cycle_num, f, \
ctype, val))
else:
delta_log_lines.append("%s, %s, %s, %s, %s\n" \
% (cp['id_file'], cycle_num, f, \
ctype, val))
### now that new positive coverage has been added, reset zero
### coverage to the current new zero coverage
cov['zero'] = {}
cov['zero'] = new_cov['zero'].copy()
if len(log_lines):
logr("\n Coverage diff %s/%s %s/%s" \
% (a_dir, a_file, b_dir, b_file),
cov_paths['log_file'], cargs)
for l in log_lines:
logr(l, cov_paths['log_file'], cargs)
append_file(l, cp['diff'])
logr("", cov_paths['log_file'], cargs)
if len(delta_log_lines):
cfile = open(cov_paths['id_delta_cov'], 'a')
for l in delta_log_lines:
cfile.write(l)
cfile.close()
return
def write_zero_cov(zero_cov, cov_paths, cargs):
cpath = cov_paths['zero_cov']
logr("[+] Final zero coverage report: %s" % cpath,
cov_paths['log_file'], cargs)
cfile = open(cpath, 'w')
cfile.write("# All functions / lines in this file were never executed by any\n")
cfile.write("# AFL test case.\n")
cfile.close()
write_cov(cpath, zero_cov, cargs)
return
def write_pos_cov(pos_cov, cov_paths, cargs):
cpath = cov_paths['pos_cov']
logr("[+] Final positive coverage report: %s" % cpath,
cov_paths['log_file'], cargs)
cfile = open(cpath, 'w')
cfile.write("# All functions / lines in this file were executed by at\n")
cfile.write("# least one AFL test case. See the cov/id-delta-cov file\n")
cfile.write("# for more information.\n")
cfile.close()
write_cov(cpath, pos_cov, cargs)
return
def write_cov(cpath, cov, cargs):
cfile = open(cpath, 'a')
for f in cov:
cfile.write("File: %s\n" % f)
for ctype in sorted(cov[f]):
if ctype == 'function':
for val in sorted(cov[f][ctype]):
cfile.write(" %s: %s\n" % (ctype, val))
elif ctype == 'line':
if cargs.coverage_include_lines:
for val in sorted(cov[f][ctype], key=int):
cfile.write(" %s: %s\n" % (ctype, val))
cfile.close()
return
def rm_prev_cov_files(ct):
for cname in ['prev_lcov_base', 'prev_lcov_info',
'prev_lcov_info_tmp', 'prev_lcov_info_final']:
if cname in ct and os.path.exists(ct[cname]):
os.remove(ct[cname])
return
def write_status(status_file):
f = open(status_file, 'w')
f.write("afl_cov_pid : %d\n" % os.getpid())
f.write("afl_cov_version : %s\n" % __version__)
f.write("command_line : %s\n" % ' '.join(argv))
f.close()
return
def append_file(pstr, path):
f = open(path, 'a')
f.write("%s\n" % pstr)
f.close()
return
def cov_init(cfile, cov):
for k in ['zero', 'pos']:
if k not in cov:
cov[k] = {}
if cfile not in cov[k]:
cov[k][cfile] = {}
cov[k][cfile]['function'] = {}
cov[k][cfile]['line'] = {}
return
def extract_coverage(lcov_file, cargs):
search_rv = False
tmp_cov = {}
### populate old lcov output for functions/lines that were called
### zero times
with open(lcov_file, 'r') as f:
current_file = ''
for line in f:
line = line.strip()
m = re.search('SF:(\S+)', line)
if m and m.group(1):
current_file = m.group(1)
cov_init(current_file, tmp_cov)
continue
if current_file:
m = re.search('^FNDA:(\d+),(\S+)', line)
if m and m.group(2):
fcn = m.group(2) + '()'
if m.group(1) == '0':
### the function was never called
tmp_cov['zero'][current_file]['function'][fcn] = ''
else:
tmp_cov['pos'][current_file]['function'][fcn] = ''
continue
### look for lines that were never called
m = re.search('^DA:(\d+),(\d+)', line)
if m and m.group(1):
lnum = m.group(1)
if m.group(2) == '0':
### the line was never executed
tmp_cov['zero'][current_file]['line'][lnum] = ''
else:
tmp_cov['pos'][current_file]['line'][lnum] = ''
return tmp_cov
def search_cov(cargs):
search_rv = False
id_delta_file = cargs.afl_fuzzing_dir + '/cov/id-delta-cov'
log_file = cargs.afl_fuzzing_dir + '/cov/afl-cov.log'
with open(id_delta_file, 'r') as f:
for line in f:
line = line.strip()
### id:NNNNNN*_file, cycle, src_file, cov_type, fcn/line\n")
[id_file, cycle_num, src_file, cov_type, val] = line.split(', ')
if cargs.func_search and cov_type == 'function' and val == cargs.func_search:
if cargs.src_file:
if cargs.src_file == src_file:
logr("[+] Function '%s' in file: '%s' executed by: '%s', cycle: %s" \
% (val, current_file, id_file, cycle_num),
log_file, cargs)
search_rv = True
else:
logr("[+] Function '%s' executed by: '%s', cycle: %s" \
% (val, id_file, cycle_num),
log_file, cargs)
search_rv = True
if cargs.src_file == src_file \
and cargs.line_search and val == cargs.line_search:
if cargs.src_file == src_file:
logr("[+] Line '%s' in file: '%s' executed by: '%s', cycle: %s" \
% (val, current_file, id_file, cycle_num),
log_file, cargs)
search_rv = True
if not search_rv:
if cargs.func_search:
logr("[-] Function '%s' not found..." % cargs.func_search,
log_file, cargs)
elif cargs.line_search:
logr("[-] Line %s not found..." % cargs.line_search,
log_file, cargs)
return search_rv
def get_cycle_num(id_num, cargs):
### default cycle
cycle_num = 0
if not is_dir(cargs.afl_fuzzing_dir + '/plot_data'):
return cycle_num
with open(cargs.afl_fuzzing_dir + '/plot_data') as f:
for line in f:
### unix_time, cycles_done, cur_path, paths_total, pending_total,...
### 1427742641, 11, 54, 419, 45, 0, 2.70%, 0, 0, 9, 1645.47
vals = line.split(', ')
### test the id number against the current path
if vals[2] == str(id_num):
cycle_num = int(vals[1])
break
return cycle_num
def gen_coverage(fuzz_dir, cov_paths, cargs):
cp = cov_paths['dirs'][fuzz_dir]
out_lines = []
lcov_opts = ''
if cargs.enable_branch_coverage:
lcov_opts += ' --rc lcov_branch_coverage=1'
run_cmd(cargs.lcov_path \
+ lcov_opts
+ " --no-checksum --capture --initial" \
+ " --directory " + cargs.code_dir \
+ " --output-file " \
+ cp['lcov_base'], \
cov_paths, cargs, NO_OUTPUT)
run_cmd(cargs.lcov_path \
+ lcov_opts
+ " --no-checksum --capture --directory " \
+ cargs.code_dir + " --output-file " \
+ cp['lcov_info'], \
cov_paths, cargs, NO_OUTPUT)
if (cargs.disable_lcov_exclude_pattern):
out_lines = run_cmd(cargs.lcov_path \
+ lcov_opts
+ " --no-checksum -a " + cp['lcov_base'] \
+ " -a " + cp['lcov_info'] \
+ " --output-file " + cp['lcov_info_final'], \
cov_paths, cargs, WANT_OUTPUT)
else:
run_cmd(cargs.lcov_path \
+ lcov_opts
+ " --no-checksum -a " + cp['lcov_base'] \
+ " -a " + cp['lcov_info'] \
+ " --output-file " + cp['lcov_info_tmp'], \
cov_paths, cargs, NO_OUTPUT)
out_lines = run_cmd(cargs.lcov_path \
+ lcov_opts
+ " --no-checksum -r " + cp['lcov_info_tmp'] \
+ " " + cargs.lcov_exclude_pattern + " --output-file " \
+ cp['lcov_info_final'],
cov_paths, cargs, WANT_OUTPUT)
for line in out_lines:
m = re.search('^\s+(lines\.\..*\:\s.*)', line)
if m and m.group(1):
logr(" " + m.group(1), cov_paths['log_file'], cargs)
else:
m = re.search('^\s+(functions\.\..*\:\s.*)', line)
if m and m.group(1):
logr(" " + m.group(1), cov_paths['log_file'], cargs)
else:
if cargs.enable_branch_coverage:
m = re.search('^\s+(branches\.\..*\:\s.*)', line)
if m and m.group(1):
logr(" " + m.group(1),
cov_paths['log_file'], cargs)
return
def gen_web_cov_report(fuzz_dir, cov_paths, cargs):
cp = cov_paths['dirs'][fuzz_dir]
web_dir = cov_paths['web_dir']
os.mkdir(cp['lcov_web_dir'])
genhtml_opts = ''
if cargs.enable_branch_coverage:
genhtml_opts += ' --branch-coverage'
run_cmd(cargs.genhtml_path \
+ genhtml_opts
+ " --output-directory " \
+ cp['lcov_web_dir'] + " " \
+ cp['lcov_info_final'], \
cov_paths, cargs, NO_OUTPUT)
logr("[+] Final lcov web report: %s" \
% web_dir + '/lcov-web-final.html',
cov_paths['log_file'], cargs)
if cp['lcov_web_dir'][0] == '/':
os.symlink(cp['lcov_web_dir'] + '/index.html',
web_dir + '/lcov-web-final.html')
else:
os.symlink(os.getcwd() + '/' + cp['lcov_web_dir'] \
+ '/index.html', web_dir + '/lcov-web-final.html')
return
def is_afl_fuzz_running(cargs):
pid = None
stats_file = cargs.afl_fuzzing_dir + '/fuzzer_stats'
if os.path.exists(stats_file):
pid = get_running_pid(stats_file, 'fuzzer_pid\s+\:\s+(\d+)')
else:
for p in os.listdir(cargs.afl_fuzzing_dir):
stats_file = cargs.afl_fuzzing_dir + '/' + p + '/fuzzer_stats'
if os.path.exists(stats_file):
### allow a single running AFL instance in parallel mode
### to mean that AFL is running (and may be generating
### new code coverage)
pid = get_running_pid(stats_file, 'fuzzer_pid\s+\:\s+(\d+)')
if pid:
break
return pid
def get_running_pid(stats_file, pid_re):
pid = None
if not os.path.exists(stats_file):
return pid
with open(stats_file, 'r') as f:
for line in f:
line = line.strip()
### fuzzer_pid : 13238
m = re.search(pid_re, line)
if m and m.group(1):
is_running = int(m.group(1))
try:
os.kill(is_running, 0)
except OSError as e:
if e.errno == errno.EPERM:
pid = is_running
else:
pid = is_running
break
return pid
def gen_paths(fuzz_dir, cov_paths, afl_file, cargs):
basename = os.path.basename(afl_file)
basedir = os.path.basename(fuzz_dir)
cp = cov_paths['dirs'][fuzz_dir]
for k in ['diff_dir', 'web_dir', 'lcov_dir']:
if not is_dir(cov_paths[k] + '/' + basedir):
os.mkdir(cov_paths[k] + '/' + basedir)
### coverage diffs from one ID file to the next
cp['diff'] = cov_paths['diff_dir'] + '/' + basedir + '/' + basename
### current id:NNNNNN* test case file
cp['id_file'] = basedir + '/' + basename
### web files
cp['lcov_web_dir'] = cov_paths['web_dir'] + \
'/' + basedir + '/' + basename
### raw lcov files
cp['lcov_base'] = cov_paths['lcov_dir'] + \
'/' + basedir + '/' + basename + '.lcov_base'
cp['lcov_info'] = cov_paths['lcov_dir'] + \
'/' + basedir + '/' + basename + '.lcov_info'
cp['lcov_info_tmp'] = cov_paths['lcov_dir'] + '/' \
+ basedir + '/' + basename + '.lcov_info_tmp'
cp['lcov_info_final'] = cov_paths['lcov_dir'] + '/' \
+ basedir + '/' + basename + '.lcov_info_final'
if cp['prev_file']:
cp['prev_lcov_base'] = cov_paths['lcov_dir'] + '/' \
+ basedir + '/' + os.path.basename(cp['prev_file']) \
+ '.lcov_base'
cp['prev_lcov_info'] = cov_paths['lcov_dir'] + '/' \
+ basedir + '/' + os.path.basename(cp['prev_file']) \
+ '.lcov_info'
cp['prev_lcov_info_tmp'] = cov_paths['lcov_dir'] + '/' \
+ basedir + '/' + os.path.basename(cp['prev_file']) \
+ '.lcov_info_tmp'
cp['prev_lcov_info_final'] = cov_paths['lcov_dir'] + \
'/' + basedir + '/' + os.path.basename(cp['prev_file']) \
+ '.lcov_info_final'
return
def run_cmd(cmd, cov_paths, cargs, collect):
out = []
if cargs.verbose:
logr(" CMD: %s" % cmd, cov_paths['log_file'], cargs)
fh = None
if cargs.disable_cmd_redirection or collect == WANT_OUTPUT:
fh = open(cov_paths['tmp_out'], 'w')
else:
fh = open(os.devnull, 'w')
subprocess.call(cmd, stdin=None,
stdout=fh, stderr=subprocess.STDOUT, shell=True)
fh.close()
if cargs.disable_cmd_redirection or collect == WANT_OUTPUT:
with open(cov_paths['tmp_out'], 'r') as f:
for line in f:
out.append(line.rstrip('\n'))
return out
def import_fuzzing_dirs(cov_paths, cargs):
if not cargs.afl_fuzzing_dir:
print "[*] Must specify AFL fuzzing dir with --afl-fuzzing-dir or -d"
return False
if 'top_dir' not in cov_paths:
if not init_tracking(cov_paths, cargs):
return False
def_dir = cargs.afl_fuzzing_dir
if is_dir(def_dir + '/queue'):
if def_dir not in cov_paths['dirs']:
add_dir(def_dir, cov_paths)
else:
for p in os.listdir(def_dir):
fuzz_dir = def_dir + '/' + p
if is_dir(fuzz_dir):
if is_dir(fuzz_dir + '/queue'):
### found an AFL fuzzing directory instance
if fuzz_dir not in cov_paths['dirs']:
add_dir(fuzz_dir, cov_paths)
return True
def import_test_cases(qdir):
return sorted(glob.glob(qdir + "/id:*"))
def init_tracking(cov_paths, cargs):
cov_paths['dirs'] = {}
cov_paths['top_dir'] = cargs.afl_fuzzing_dir + '/cov'
cov_paths['web_dir'] = cov_paths['top_dir'] + '/web'
cov_paths['lcov_dir'] = cov_paths['top_dir'] + '/lcov'
cov_paths['diff_dir'] = cov_paths['top_dir'] + '/diff'
cov_paths['log_file'] = cov_paths['top_dir'] + '/afl-cov.log'
cov_paths['tmp_out'] = cov_paths['top_dir'] + '/cmd-out.tmp'
### global coverage results
cov_paths['id_delta_cov'] = cov_paths['top_dir'] + '/id-delta-cov'
cov_paths['zero_cov'] = cov_paths['top_dir'] + '/zero-cov'
cov_paths['pos_cov'] = cov_paths['top_dir'] + '/pos-cov'
if cargs.overwrite:
mkdirs(cov_paths, cargs)
else:
if is_dir(cov_paths['top_dir']):
if not cargs.func_search and not cargs.line_search:
print "[*] Existing coverage dir %s found, use --overwrite to " \
"re-calculate coverage" % (cov_paths['top_dir'])
return False
else:
mkdirs(cov_paths, cargs)
write_status(cov_paths['top_dir'] + '/afl-cov-status')
if not cargs.disable_coverage_init and cargs.coverage_cmd:
lcov_opts = ''
if cargs.enable_branch_coverage:
lcov_opts += ' --rc lcov_branch_coverage=1 '
### reset code coverage counters - this is done only once as
### afl-cov is spinning up even if AFL is running in parallel mode
run_cmd(cargs.lcov_path \
+ lcov_opts \
+ " --no-checksum --zerocounters --directory " \
+ cargs.code_dir, cov_paths, cargs, NO_OUTPUT)
return True
### credit:
### http://stackoverflow.com/questions/377017/test-if-executable-exists-in-python
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
def which(prog):
fpath, fname = os.path.split(prog)
if fpath:
if is_exe(prog):
return prog
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, prog)
if is_exe(exe_file):
return exe_file
return None
def validate_args(cargs):
if cargs.coverage_cmd:
if 'AFL_FILE' not in cargs.coverage_cmd:
print "[*] --coverage-cmd must contain AFL_FILE"
return False
### make sure at least one component of the command is an
### executable
found_exec = False
for part in cargs.coverage_cmd.split(' '):
if part[0] == ' ' or part[0] == '-':
continue
if (which(part)):
return True
if not found_exec:
print "[*] Could not find an executable binary in " \
"--coverage-cmd '%s'" % cargs.coverage_cmd
return False
else:
if not cargs.func_search and not cargs.line_search:
print "[*] Must set --coverage-cmd or --func-search/--line-search"
return False
if cargs.code_dir:
if not is_dir(cargs.code_dir):
print "[*] --code-dir path does not exist"
return False
else:
if not cargs.func_search and not cargs.line_search:
print "[*] Must set --code-dir unless using --func-search " \
"against existing afl-cov directory"
return False
if cargs.func_search or cargs.line_search:
if not cargs.afl_fuzzing_dir:
print "[*] Must set --afl-fuzzing-dir"
return False
if cargs.func_search and '()' not in cargs.func_search:
cargs.func_search += '()'
if cargs.line_search and not cargs.src_file:
print "[*] Must set --src-file in --line-search mode"
return False
if cargs.live and not cargs.ignore_core_pattern:
if not check_core_pattern():
return False
if not cargs.live and not is_dir(cargs.afl_fuzzing_dir):
print "[*] It doesn't look like directory '%s' exists" \
% (cargs.afl_fuzzing_dir)
return False
if not which(cargs.lcov_path):
print "[*] lcov command does not exist: %s" % (cargs.lcov_path)
return False
if not cargs.disable_lcov_web and cargs.lcov_web_all:
if not which(cargs.genhtml_path):
print "[*] genhtml command does not exist: %s" % (cargs.genthml_path)
return False
return True
def is_afl_running(cargs):
while not is_dir(cargs.afl_fuzzing_dir):
if not cargs.background:
print "[-] Sleep for %d seconds for AFL fuzzing directory to be created..." \
% cargs.sleep
time.sleep(cargs.sleep)
### if we make it here then afl-fuzz is presumably running
while not is_afl_fuzz_running(cargs):
if not cargs.background:
print "[-] Sleep for %d seconds waiting for afl-fuzz to be started...." \
% cargs.sleep
time.sleep(cargs.sleep)
return
def add_dir(fdir, cov_paths):
cov_paths['dirs'][fdir] = {}
cov_paths['dirs'][fdir]['prev_file'] = ''
return
def mkdirs(cov_paths, cargs):
create_cov_dirs = 0
if is_dir(cov_paths['top_dir']):
if cargs.overwrite:
rmtree(cov_paths['top_dir'])
create_cov_dirs = 1
else:
create_cov_dirs = 1
if create_cov_dirs:
for k in ['top_dir', 'web_dir', 'lcov_dir', 'diff_dir']:
os.mkdir(cov_paths[k])
### write coverage results in the following format
cfile = open(cov_paths['id_delta_cov'], 'w')
cfile.write("# id:NNNNNN*_file, cycle, src_file, coverage_type, fcn/line\n")
cfile.close()
return
def is_dir(dpath):
return os.path.exists(dpath) and os.path.isdir(dpath)
def logr(pstr, log_file, cargs):
if not cargs.background and not cargs.quiet:
print " " + pstr
append_file(pstr, log_file)
return
def stop_afl(cargs):
rv = True
### note that this function only looks for afl-fuzz processes - it does not
### stop afl-cov processes since they will stop on their own after afl-fuzz
### is also stopped.
if not cargs.afl_fuzzing_dir:
print "[*] Must set --afl-fuzzing-dir"
return False
if not is_dir(cargs.afl_fuzzing_dir):
print "[*] Doesn't look like AFL fuzzing directory '%s' exists." \
% cargs.afl_fuzzing_dir
return False
if os.path.exists(cargs.afl_fuzzing_dir + '/fuzzer_stats'):
afl_pid = get_running_pid(cargs.afl_fuzzing_dir + '/fuzzer_stats',
'fuzzer_pid\s+\:\s+(\d+)')
if afl_pid:
print "[+] Stopping running afl-fuzz instance, PID: %d" % afl_pid
os.kill(afl_pid, signal.SIGTERM)
else:
print "[-] No running afl-fuzz instance"
rv = False
else:
found = False
for p in os.listdir(cargs.afl_fuzzing_dir):
stats_file = cargs.afl_fuzzing_dir + '/' + p + '/fuzzer_stats'
if os.path.exists(stats_file):
afl_pid = get_running_pid(stats_file, 'fuzzer_pid\s+\:\s+(\d+)')
if afl_pid:
print "[+] Stopping running afl-fuzz instance, PID: %d" \
% afl_pid
os.kill(afl_pid, signal.SIGTERM)
found = True
if not found:
print "[-] No running afl-fuzz instance"
rv = False
return rv
def check_core_pattern():
rv = True
core_pattern_file = '/proc/sys/kernel/core_pattern'
### check /proc/sys/kernel/core_pattern to see if afl-fuzz will
### accept it
if os.path.exists(core_pattern_file):
with open(core_pattern_file, 'r') as f:
if f.readline().rstrip()[0] == '|':
### same logic as implemented by afl-fuzz itself
print "[*] afl-fuzz requires 'echo core >%s'" \
% core_pattern_file
rv = False
return rv
def parse_cmdline():
p = argparse.ArgumentParser()
p.add_argument("-e", "--coverage-cmd", type=str,
help="Set command to exec (including args, and assumes code coverage support)")
p.add_argument("-d", "--afl-fuzzing-dir", type=str,
help="top level AFL fuzzing directory")
p.add_argument("-c", "--code-dir", type=str,
help="Directory where the code lives (compiled with code coverage support)")
p.add_argument("-O", "--overwrite", action='store_true',
help="Overwrite existing coverage results", default=False)
p.add_argument("--disable-cmd-redirection", action='store_true',
help="Disable redirection of command results to /dev/null",
default=False)
p.add_argument("--disable-lcov-web", action='store_true',
help="Disable generation of all lcov web code coverage reports",
default=False)
p.add_argument("--disable-coverage-init", action='store_true',
help="Disable initialization of code coverage counters at afl-cov startup",
default=False)
p.add_argument("--coverage-include-lines", action='store_true',
help="Include lines in zero-coverage status files",
default=False)
p.add_argument("--enable-branch-coverage", action='store_true',
help="Include branch coverage in code coverage reports (may be slow)",
default=False)
p.add_argument("--live", action='store_true',
help="Process a live AFL directory, and afl-cov will exit when it appears afl-fuzz has been stopped",
default=False)
p.add_argument("--sleep", type=int,
help="In --live mode, # of seconds to sleep between checking for new queue files",
default=60)
p.add_argument("--background", action='store_true',
help="Background mode - if also in --live mode, will exit when the alf-fuzz process is finished",
default=False)
p.add_argument("--lcov-web-all", action='store_true',
help="Generate lcov web reports for all id:NNNNNN* files instead of just the last one",
default=False)
p.add_argument("--preserve-all-lcov-files", action='store_true',
help="Keep all lcov files (not usually necessary)",
default=False)
p.add_argument("--disable-lcov-exclude-pattern", action='store_true',
help="Allow default /usr/include/* pattern to be included in lcov results",
default=False)
p.add_argument("--lcov-exclude-pattern", type=str,
help="Set exclude pattern for lcov results",
default="/usr/include/\*")
p.add_argument("--func-search", type=str,
help="Search for coverage of a specific function")
p.add_argument("--line-search", type=str,
help="Search for coverage of a specific line number (requires --src-file)")
p.add_argument("--src-file", type=str,
help="Restrict function or line search to a specific source file")
p.add_argument("--afl-queue-id-limit", type=int,
help="Limit the number of id:NNNNNN* files processed in the AFL queue/ directory",
default=0)
p.add_argument("--ignore-core-pattern", action='store_true',
help="Ignore the /proc/sys/kernel/core_pattern setting in --live mode",
default=False)
p.add_argument("--lcov-path", type=str,
help="Path to lcov command", default="/usr/bin/lcov")
p.add_argument("--genhtml-path", type=str,
help="Path to genhtml command", default="/usr/bin/genhtml")
p.add_argument("--stop-afl", action='store_true',
help="Stop all running afl-fuzz instances associated with --afl-fuzzing-dir <dir>",
default=False)
p.add_argument("-v", "--verbose", action='store_true',
help="Verbose mode", default=False)
p.add_argument("-V", "--version", action='store_true',
help="Print version and exit", default=False)
p.add_argument("-q", "--quiet", action='store_true',
help="Quiet mode", default=False)
return p.parse_args()
if __name__ == "__main__":
sys.exit(main())