当时阅读时候的版本:ClusterFuzz v2.0.1
首先运行bot的入口是python butler.py run_bot
args = parser.parse_args()
if not args.command:
parser.print_help()
return
_setup()
command = importlib.import_module('local.butler.%s' % args.command)
command.execute(args)
之后执行中的src/local/butler/run_bot.py
中的execute
def execute(args):
"""Run the bot."""
appengine_path = appengine.find_sdk_path()
_setup_bot_directory(args)
_setup_environment_and_configs(args, appengine_path)
try:
os.chdir(os.path.join(args.directory, 'clusterfuzz'))
proc = common.execute_async('python src/python/bot/startup/run_bot.py')
def _stop_handler(*_):
print('Bot has been stopped. Exit.')
proc.kill()
signal.signal(signal.SIGTERM, _stop_handler)
common.process_proc_output(proc)
proc.wait()
except KeyboardInterrupt:
_stop_handler()
之后就是执行src/python/bot/startup/run_bot.py
,而这里面是有定义main函数的
if __name__ == '__main__':
if sys.version_info.major == 3:
# TODO(ochang): Remove check once all migrated to Python 3.
multiprocessing.set_start_method('spawn')
try:
with ndb_init.context():
main()
exit_code = 0
except Exception:
traceback.print_exc()
exit_code = 1
monitor.stop()
# Prevent python GIL deadlocks on shutdown. See https://crbug.com/744680.
os._exit(exit_code) # pylint: disable=protected-access
上面就是一些初始化就执行main函数,我们来看main函数
def main():
"""Prepare the configuration options and start requesting tasks."""
logs.configure('run_bot')
root_directory = environment.get_value('ROOT_DIR')
if not root_directory:
print('Please set ROOT_DIR environment variable to the root of the source '
'checkout before running. Exiting.')
print('For an example, check init.bash in the local directory.')
return
dates.initialize_timezone_from_environment()
environment.set_bot_environment()
monitor.initialize()
if not profiler.start_if_needed('python_profiler_bot'):
sys.exit(-1)
fuzzers_init.run()
if environment.is_trusted_host(ensure_connected=False):
from bot.untrusted_runner import host
host.init()
if environment.is_untrusted_worker():
# Track revision since we won't go into the task_loop.
update_task.track_revision()
from bot.untrusted_runner import untrusted as untrusted_worker
untrusted_worker.start_server()
assert False, 'Unreachable code'
while True:
# task_loop should be an infinite loop,
# unless we run into an exception.
error_stacktrace, clean_exit, task_payload = task_loop() ### 这里是核心,task_loop
# Print the error trace to the console.
if not clean_exit:
print('Exception occurred while running "%s".' % task_payload)
print('-' * 80)
print(error_stacktrace)
print('-' * 80)
should_terminate = (
clean_exit or errors.error_in_list(error_stacktrace,
errors.BOT_ERROR_TERMINATION_LIST))
if should_terminate:
return
logs.log_error(
'Task exited with exception (payload="%s").' % task_payload,
error_stacktrace=error_stacktrace)
should_hang = errors.error_in_list(error_stacktrace,
errors.BOT_ERROR_HANG_LIST)
if should_hang:
logs.log('Start hanging forever.')
while True:
# Sleep to avoid consuming 100% of CPU.
time.sleep(60)
# See if our run timed out, if yes bail out.
if data_handler.bot_run_timed_out():
return
# Clear the current exception.
utils.exc_clear()
看到task_loop()函数,task = tasks.get_task()
获取任务,commands.process_command(task)
执行命令并删除任务
def task_loop():
"""Executes tasks indefinitely."""
clean_exit = False
while True:
exception_occurred = False
task = None
# This caches the current environment on first run. Don't move this.
environment.reset_environment()
try:
# Run regular updates.
update_task.run()
update_task.track_revision()
task = tasks.get_task() ### 获取从任务
if not task:
continue
with _Monitor(task):
with task.lease():
# Execute the command and delete the task.
commands.process_command(task) # 执行命令并删除任务
except SystemExit as e:
exception_occurred = True
clean_exit = (e.code == 0)
if not clean_exit and not isinstance(e, untrusted.HostException):
logs.log_error('SystemExit occurred while working on task.')
except commands.AlreadyRunningError:
exception_occurred = False
except Exception:
logs.log_error('Error occurred while working on task.')
exception_occurred = True
if exception_occurred:
# Prevent looping too quickly. See: crbug.com/644830
failure_wait_interval = environment.get_value('FAIL_WAIT')
time.sleep(utils.random_number(1, failure_wait_interval))
break
task_payload = task.payload() if task else None
return traceback.format_exc(), clean_exit, task_payload
我们先看get_task函数,可以看这里除了fuzz的任务,还有其他任务
def get_task():
"""Get a task."""
task = get_command_override()
if task:
return task
# TODO(unassigned): Remove this hack.
if environment.get_value('ML'):
return get_regular_task(queue=ML_JOBS_TASKQUEUE)
allow_all_tasks = not environment.get_value('PREEMPTIBLE')
if allow_all_tasks:
# Check the high-end jobs queue for bots with multiplier greater than 1.
thread_multiplier = environment.get_value('THREAD_MULTIPLIER')
if thread_multiplier and thread_multiplier > 1:
task = get_high_end_task()
if task:
return task
task = get_regular_task()
if task:
return task
task = get_fuzz_task()
if not task:
logs.log_error('Failed to get any fuzzing tasks. This should not happen.')
time.sleep(TASK_EXCEPTION_WAIT_INTERVAL)
return task
我们还是比较关心fuzz,看get_fuzz_task,这里是获取argument和job
def get_fuzz_task():
"""Try to get a fuzz task."""
argument, job = fuzzer_selection.get_fuzz_task_payload()
if not argument:
return None
return Task('fuzz', argument, job)
继续跟fuzzer_selection.get_fuzz_task_payload(),这里是去谷歌云那边查询任务了,最后随机选取任务返回
def get_fuzz_task_payload(platform=None):
"""Select a fuzzer that can run on this platform."""
if not platform:
queue_override = environment.get_value('QUEUE_OVERRIDE')
platform = queue_override if queue_override else environment.platform()
query = data_types.FuzzerJob.query()
query = query.filter(data_types.FuzzerJob.platform == platform)
mappings = list(ndb_utils.get_all_from_query(query))
if not mappings:
return None, None
selection = utils.random_weighted_choice(
mappings, weight_attribute='actual_weight') # 最后随机选取任务返回
return selection.fuzzer, selection.job
# pylint: disable=too-many-nested-blocks
# TODO(mbarbella): Rewrite this function to avoid nesting issues.
@set_task_payload
def process_command(task):
"""Figures out what to do with the given task and executes the command."""
logs.log("Executing command '%s'" % task.payload())
if not task.payload().strip():
logs.log_error('Empty task received.')
return
# Parse task payload.
task_name = task.command
task_argument = task.argument
job_name = task.job
......
......
......
# Match the cpu architecture with the ones required in the job definition.
# If they don't match, then bail out and recreate task.
if not is_supported_cpu_arch_for_job():
logs.log(
'Unsupported cpu architecture specified in job definition, exiting.')
tasks.add_task(task_name, task_argument, job_name)
return
# Initial cleanup.
cleanup_task_state()
start_web_server_if_needed()
try:
run_command(task_name, task_argument, job_name) # 运行命令
finally:
# Final clean up.
cleanup_task_state()
看run_command,实际是task_module.execute_task(task_argument, job_name)
执行
def run_command(task_name, task_argument, job_name):
"""Run the command."""
if task_name not in COMMAND_MAP:
logs.log_error("Unknown command '%s'" % task_name)
return
task_module = COMMAND_MAP[task_name]
# If applicable, ensure this is the only instance of the task running.
task_state_name = ' '.join([task_name, task_argument, job_name])
if should_update_task_status(task_name):
if not data_handler.update_task_status(task_state_name,
data_types.TaskState.STARTED):
logs.log('Another instance of "{}" already '
'running, exiting.'.format(task_state_name))
raise AlreadyRunningError
try:
task_module.execute_task(task_argument, job_name) # 执行任务
except errors.InvalidTestcaseError:
# It is difficult to try to handle the case where a test case is deleted
# during processing. Rather than trying to catch by checking every point
# where a test case is reloaded from the datastore, just abort the task.
logs.log_error('Test case %s no longer exists.' % task_argument)
except BaseException:
# On any other exceptions, update state to reflect error and re-raise.
if should_update_task_status(task_name):
data_handler.update_task_status(task_state_name,
data_types.TaskState.ERROR)
raise
# Task completed successfully.
if should_update_task_status(task_name):
data_handler.update_task_status(task_state_name,
data_types.TaskState.FINISHED)
其实一开始判断task_name是否在COMMAND_MAP中,可以看到除了fuzz任务外,还有很多任务
COMMAND_MAP = {
'analyze': analyze_task,
'blame': blame_task,
'corpus_pruning': corpus_pruning_task,
'fuzz': fuzz_task,
'impact': impact_task,
'minimize': minimize_task,
'ml_train': ml_train_task,
'progression': progression_task,
'regression': regression_task,
'symbolize': symbolize_task,
'unpack': unpack_task,
'upload_reports': upload_reports_task,
'variant': variant_task,
}
继续跟task_module.execute_task,我们关注fuzz的吧,就是fuzz_task.execute_task
def execute_task(fuzzer_name, job_type):
"""Runs the given fuzzer for one round."""
test_timeout = environment.get_value('TEST_TIMEOUT')
session = FuzzingSession(fuzzer_name, job_type, test_timeout)
session.run()
先获取超时,之后初始化FuzzingSession,初始化代码如下:
class FuzzingSession(object):
"""Class for orchestrating fuzzing sessions."""
def __init__(self, fuzzer_name, job_type, test_timeout):
self.fuzzer_name = fuzzer_name
self.job_type = job_type
# Set up randomly selected fuzzing parameters.
self.redzone = pick_redzone()
self.disable_ubsan = pick_ubsan_disabled(job_type)
self.timeout_multiplier = pick_timeout_multiplier()
self.window_argument = pick_window_argument()
self.test_timeout = set_test_timeout(test_timeout, self.timeout_multiplier)
# Set up during run().
self.testcase_directory = None
self.data_directory = None
# Fuzzing engine specific state.
self.fuzz_target = None
self.gcs_corpus = None
run的代码
def run(self):
"""Run the fuzzing session."""
failure_wait_interval = environment.get_value('FAIL_WAIT')
# Update LSAN local blacklist with global blacklist.
is_lsan_enabled = environment.get_value('LSAN')
if is_lsan_enabled:
leak_blacklist.copy_global_to_local_blacklist()
# For some binaries, we specify trials, which are sets of flags that we only
# apply some of the time. Adjust APP_ARGS for them if needed.
trials.setup_additional_args_for_app()
# Ensure that that the fuzzer still exists.
logs.log('Setting up fuzzer and data bundles.')
fuzzer = data_types.Fuzzer.query(
data_types.Fuzzer.name == self.fuzzer_name).get()
if not fuzzer or not setup.update_fuzzer_and_data_bundles(self.fuzzer_name):
_track_fuzzer_run_result(self.fuzzer_name, 0, 0,
FuzzErrorCode.FUZZER_SETUP_FAILED)
logs.log_error('Unable to setup fuzzer %s.' % self.fuzzer_name)
# Artifical sleep to slow down continuous failed fuzzer runs if the bot is
# using command override for task execution.
time.sleep(failure_wait_interval)
return
self.testcase_directory = environment.get_value('FUZZ_INPUTS')
# Set up a custom or regular build based on revision. By default, fuzzing
# is done on trunk build (using revision=None). Otherwise, a job definition
# can provide a revision to use via |APP_REVISION|.
target_weights = fuzzer_selection.get_fuzz_target_weights()
build_setup_result = build_manager.setup_build(
environment.get_value('APP_REVISION'), target_weights=target_weights)
# Check if we have an application path. If not, our build failed
# to setup correctly.
if not build_setup_result or not build_manager.check_app_path():
_track_fuzzer_run_result(self.fuzzer_name, 0, 0,
FuzzErrorCode.BUILD_SETUP_FAILED)
return
dataflow_bucket_path = environment.get_value('DATAFLOW_BUILD_BUCKET_PATH')
if dataflow_bucket_path:
# Some fuzzing jobs may use auxiliary builds, such as DFSan instrumented
# builds accompanying libFuzzer builds to enable DFT-based fuzzing.
if not build_manager.setup_trunk_build(
[dataflow_bucket_path], build_prefix='DATAFLOW'):
logs.log_error('Failed to set up dataflow build.')
# Save fuzz targets count to aid with CPU weighting.
self._save_fuzz_targets_count()
# Check if we have a bad build, i.e. one that crashes on startup.
# If yes, bail out.
logs.log('Checking for bad build.')
crash_revision = environment.get_value('APP_REVISION')
is_bad_build = testcase_manager.check_for_bad_build(self.job_type,
crash_revision)
_track_build_run_result(self.job_type, crash_revision, is_bad_build)
if is_bad_build:
return
# Data bundle directories can also have testcases which are kept in-place
# because of dependencies.
self.data_directory = setup.get_data_bundle_directory(self.fuzzer_name)
if not self.data_directory:
_track_fuzzer_run_result(self.fuzzer_name, 0, 0,
FuzzErrorCode.DATA_BUNDLE_SETUP_FAILED)
logs.log_error(
'Unable to setup data bundle %s.' % fuzzer.data_bundle_name)
return
engine_impl = engine.get(fuzzer.name)
if engine_impl:
crashes, fuzzer_metadata = self.do_engine_fuzzing(engine_impl)
# Not applicable to engine fuzzers.
testcase_file_paths = []
testcases_metadata = {}
else:
fuzzer_directory = setup.get_fuzzer_directory(self.fuzzer_name)
fuzzer_metadata, testcase_file_paths, testcases_metadata, crashes = (
self.do_blackbox_fuzzing(fuzzer, fuzzer_directory, self.job_type))
if crashes is None:
# Error occurred in generate_blackbox_testcases.
# TODO(ochang): Pipe this error a little better.
return
logs.log('Finished processing test cases.')
platform = environment.platform()
platform_id = environment.get_platform_id()
# For Android, bring back device to a good state before analyzing crashes.
if platform == 'ANDROID' and crashes:
# Remove this variable so that application is fully shutdown before every
# re-run of testcase. This is critical for reproducibility.
environment.remove_key('CHILD_PROCESS_TERMINATION_PATTERN')
# TODO(unassigned): Need to find a way to this efficiently before every
# testcase is analyzed.
android.device.initialize_device()
logs.log('Raw crash count: ' + str(len(crashes)))
project_name = data_handler.get_project_name(self.job_type)
# Process and save crashes to datastore.
bot_name = environment.get_value('BOT_NAME')
new_crash_count, known_crash_count, processed_groups = process_crashes(
crashes=crashes,
context=Context(
project_name=project_name,
bot_name=bot_name,
job_type=self.job_type,
fuzz_target=self.fuzz_target,
redzone=self.redzone,
disable_ubsan=self.disable_ubsan,
platform_id=platform_id,
crash_revision=crash_revision,
fuzzer_name=self.fuzzer_name,
window_argument=self.window_argument,
fuzzer_metadata=fuzzer_metadata,
testcases_metadata=testcases_metadata,
timeout_multiplier=self.timeout_multiplier,
test_timeout=self.test_timeout,
thread_wait_timeout=THREAD_WAIT_TIMEOUT,
data_directory=self.data_directory))
read_and_upload_testcase_run_stats(
self.fuzzer_name, self.fully_qualified_fuzzer_name, self.job_type,
crash_revision, testcase_file_paths)
upload_job_run_stats(self.fully_qualified_fuzzer_name, self.job_type,
crash_revision, time.time(),
new_crash_count, known_crash_count,
len(testcase_file_paths), processed_groups)
# Delete the fuzzed testcases. This is explicitly needed since
# some testcases might reside on NFS and would otherwise be
# left forever.
for testcase_file_path in testcase_file_paths:
shell.remove_file(testcase_file_path)
# Explicit cleanup for large vars.
del testcase_file_paths
del testcases_metadata
utils.python_gc()
实在太多了,前面做了一些初始化,之后是选择引擎进行fuzz——self.do_engine_fuzzing(engine_impl)
,没有的就是黑盒测试self.do_blackbox_fuzzing
engine_impl = engine.get(fuzzer.name)
if engine_impl:
crashes, fuzzer_metadata = self.do_engine_fuzzing(engine_impl)
# Not applicable to engine fuzzers.
testcase_file_paths = []
testcases_metadata = {}
else:
fuzzer_directory = setup.get_fuzzer_directory(self.fuzzer_name)
fuzzer_metadata, testcase_file_paths, testcases_metadata, crashes = (
self.do_blackbox_fuzzing(fuzzer, fuzzer_directory, self.job_type))
接下来最后就是处理crashes,上传crash,最后更新任务的状态
# Process and save crashes to datastore.
bot_name = environment.get_value('BOT_NAME')
new_crash_count, known_crash_count, processed_groups = process_crashes(
crashes=crashes,
context=Context(
project_name=project_name,
bot_name=bot_name,
job_type=self.job_type,
fuzz_target=self.fuzz_target,
redzone=self.redzone,
disable_ubsan=self.disable_ubsan,
platform_id=platform_id,
crash_revision=crash_revision,
fuzzer_name=self.fuzzer_name,
window_argument=self.window_argument,
fuzzer_metadata=fuzzer_metadata,
testcases_metadata=testcases_metadata,
timeout_multiplier=self.timeout_multiplier,
test_timeout=self.test_timeout,
thread_wait_timeout=THREAD_WAIT_TIMEOUT,
data_directory=self.data_directory))
read_and_upload_testcase_run_stats(
self.fuzzer_name, self.fully_qualified_fuzzer_name, self.job_type,
crash_revision, testcase_file_paths)
upload_job_run_stats(self.fully_qualified_fuzzer_name, self.job_type,
crash_revision, time.time(),
new_crash_count, known_crash_count,
len(testcase_file_paths), processed_groups)
def do_engine_fuzzing(self, engine_impl):
"""Run fuzzing engine."""
# Record fuzz target.
fuzz_target_name = environment.get_value('FUZZ_TARGET')
self.fuzz_target = record_fuzz_target(engine_impl.name, fuzz_target_name,
self.job_type)
environment.set_value('FUZZER_NAME',
self.fuzz_target.fully_qualified_name())
# Synchronize corpus files with GCS
sync_corpus_directory = builtin.get_corpus_directory(
self.data_directory, self.fuzz_target.project_qualified_name())
self.sync_corpus(sync_corpus_directory)
# Reset memory tool options.
environment.reset_current_memory_tool_options(
redzone_size=self.redzone, disable_ubsan=self.disable_ubsan)
revision = environment.get_value('APP_REVISION')
crashes = []
fuzzer_metadata = {}
return_code = 1 # Vanilla return-code for engine crashes.
# Do the actual fuzzing.
for fuzzing_round in range(environment.get_value('MAX_TESTCASES', 1)):
logs.log('Fuzzing round {}.'.format(fuzzing_round))
result, current_fuzzer_metadata = run_engine_fuzzer(
engine_impl, self.fuzz_target.binary, sync_corpus_directory,
self.testcase_directory)
fuzzer_metadata.update(current_fuzzer_metadata)
# Prepare stats.
testcase_run = engine_common.get_testcase_run(result.stats,
result.command)
# Upload logs, testcases (if there are crashes), and stats.
# Use a consistent log time to allow correlating between logs, uploaded
# testcases, and stats.
log_time = datetime.datetime.utcfromtimestamp(
float(testcase_run.timestamp))
crash_result = CrashResult(return_code, result.time_executed, result.logs)
log = testcase_manager.prepare_log_for_upload(
crash_result.get_stacktrace(), return_code)
testcase_manager.upload_log(log, log_time)
for crash in result.crashes:
testcase_manager.upload_testcase(crash.input_path, log_time)
add_additional_testcase_run_data(testcase_run,
self.fuzz_target.fully_qualified_name(),
self.job_type, revision)
upload_testcase_run_stats(testcase_run)
if result.crashes:
crashes.extend([
Crash.from_engine_crash(crash) for crash in result.crashes if crash
])
logs.log('All fuzzing rounds complete.')
self.sync_new_corpus_files()
return crashes, fuzzer_metadata
实际fuzz是run_engine_fuzzer
def run_engine_fuzzer(engine_impl, target_name, sync_corpus_directory,
testcase_directory):
"""Run engine for fuzzing."""
if environment.is_trusted_host():
from bot.untrusted_runner import tasks_host # 这还搞了个untrusted的runner,太复杂了
return tasks_host.engine_fuzz(engine_impl, target_name,
sync_corpus_directory, testcase_directory)
build_dir = environment.get_value('BUILD_DIR')
target_path = engine_common.find_fuzzer_path(build_dir, target_name)
options = engine_impl.prepare(sync_corpus_directory, target_path, build_dir)
fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT')
result = engine_impl.fuzz(target_path, options, testcase_directory,
fuzz_test_timeout) # 调用对应引擎的fuzz函数
logs.log('Used strategies.', strategies=options.strategies)
for strategy, value in six.iteritems(options.strategies):
result.stats['strategy_' + strategy] = value
# Format logs with header and strategy information.
log_header = engine_common.get_log_header(result.command,
environment.get_value('BOT_NAME'),
result.time_executed)
formatted_strategies = engine_common.format_fuzzing_strategies(
options.strategies)
result.logs = log_header + '\n' + result.logs + '\n' + formatted_strategies
fuzzer_metadata = {
'fuzzer_binary_name': target_name,
}
fuzzer_metadata.update(engine_common.get_all_issue_metadata(target_path))
_add_issue_metadata_from_environment(fuzzer_metadata)
return result, fuzzer_metadata
上面提到的运行fuzzer,是通过engine.get是获取引擎类,get函数如下
def get(name):
"""Get an implemntation of a fuzzing engine, or None if one does not exist."""
engine_class = _ENGINES.get(name)
if engine_class:
return engine_class()
return None
而之前得先注册
def register(name, engine_class):
"""Register a fuzzing engine."""
if name in _ENGINES:
raise ValueError('Engine {name} is already registered'.format(name=name))
_ENGINES[name] = engine_class
而注册这个在src/python/bot/startup/run_bot.py
的时候注册的
......
from bot.fuzzers import init as fuzzers_init
......
def main():
if not profiler.start_if_needed('python_profiler_bot'):
sys.exit(-1)
fuzzers_init.run() # 在这里注册
跟过去可以看注册了libFuzzer,honggfuzz和syzkaller,有疑问的是咋没有afl
def run():
"""Initialise builtin fuzzing engines."""
engine.register('libFuzzer', libFuzzer_engine.LibFuzzerEngine)
engine.register('honggfuzz', honggfuzz_engine.HonggfuzzEngine)
engine.register('syzkaller', syzkaller_engine.SyzkallerEngine)
def do_blackbox_fuzzing(self, fuzzer, fuzzer_directory, job_type):
"""Run blackbox fuzzing. Currently also used for engine fuzzing."""
# Set the thread timeout values.
# TODO(ochang): Remove this hack once engine fuzzing refactor is compelte.
fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT')
if fuzz_test_timeout:
test_timeout = set_test_timeout(fuzz_test_timeout,
self.timeout_multiplier)
else:
test_timeout = self.test_timeout
thread_timeout = test_timeout
# Determine number of testcases to process.
testcase_count = environment.get_value('MAX_TESTCASES')
# For timeout multipler greater than 1, we need to decrease testcase count
# to prevent exceeding task lease time.
if self.timeout_multiplier > 1:
testcase_count /= self.timeout_multiplier
# Run the fuzzer to generate testcases. If error occurred while trying
# to run the fuzzer, bail out.
(error_occurred, testcase_file_paths, sync_corpus_directory,
fuzzer_metadata) = self.generate_blackbox_testcases(
fuzzer, fuzzer_directory, testcase_count)
......
......
在self.generate_blackbox_testcases里面是会实际启动fuzzer的,注释说的是Run the blackbox fuzzer and generate testcases.
def generate_blackbox_testcases(self, fuzzer, fuzzer_directory,
testcase_count):
"""Run the blackbox fuzzer and generate testcases."""
# Helper variables.
error_occurred = False
fuzzer_revision = fuzzer.revision
fuzzer_name = fuzzer.name
sync_corpus_directory = None
# Clear existing testcases (only if past task failed).
testcase_directories = get_testcase_directories(self.testcase_directory,
self.data_directory)
testcase_manager.remove_testcases_from_directories(testcase_directories)
# Set an environment variable for fuzzer name.
# TODO(ochang): Investigate removing this. Only users appear to be chromebot
# fuzzer and fuzzer_logs, both of which can be removed.
environment.set_value('FUZZER_NAME', fuzzer_name)
# Set minimum redzone size, do not detect leaks and zero out the
# quarantine size before running the fuzzer.
environment.reset_current_memory_tool_options(
redzone_size=16, leaks=False, quarantine_size_mb=0)
if fuzzer.builtin:
fuzzer_command = 'builtin'
builtin_fuzzer = builtin_fuzzers.get(fuzzer.name) #这里面有个afl和libfuzz
builtin_result = builtin_fuzzer.run(
self.data_directory, self.testcase_directory, testcase_count)
fuzzer_output = builtin_result.output
sync_corpus_directory = builtin_result.corpus_directory
# Return code is always 0 as builtin fuzzers log errors directly.
fuzzer_return_code = 0
else:
# Make sure we have a file to execute for the fuzzer.
......
......
# 获取可执行文件fuzzer路径,应该是libfuzzer那样,二进制文件就是fuzzer
# Get the fuzzer executable and chdir to its base directory. This helps to
# prevent referencing every file using __file__.
fuzzer_executable = os.path.join(fuzzer_directory, fuzzer.executable_path)
fuzzer_executable_directory = os.path.dirname(fuzzer_executable)
......
......
# Build the fuzzer command execution string.
command = shell.get_execute_command(fuzzer_executable)
......
command_format = ('%s --input_dir%s%s --output_dir%s%s --no_of_files%s%d')
fuzzer_command = str(
command_format % (command, argument_seperator, self.data_directory,
argument_seperator, self.testcase_directory,
argument_seperator, testcase_count))
fuzzer_timeout = environment.get_value('FUZZER_TIMEOUT')
# Run the fuzzer. 启动fuzzer
logs.log('Running fuzzer - %s.' % fuzzer_command)
fuzzer_return_code, fuzzer_duration, fuzzer_output = (
process_handler.run_process(
fuzzer_command,
current_working_directory=fuzzer_executable_directory,
timeout=fuzzer_timeout,
testcase_run=False,
ignore_children=False))
下面是builtin_fuzzer = builtin_fuzzers.get(fuzzer.name)
所能获取到的
BUILTIN_FUZZERS = {
'afl': afl.Afl(),
'libFuzzer': libFuzzer.LibFuzzer(),
}
def get(fuzzer_name):
"""Get the builtin fuzzer with the given name, or None."""
if fuzzer_name not in BUILTIN_FUZZERS:
return None
return BUILTIN_FUZZERS[fuzzer_name]
下面的do_blackbox_fuzzing函数的后半部分,是处理testcases的
............
............
# Start processing the testcases.
while test_number < len(testcase_file_paths):
thread_index = 0
threads = []
temp_queue = process_handler.get_queue()
if not temp_queue:
process_handler.terminate_stale_application_instances()
logs.log_error('Unable to create temporary crash queue.')
break
while thread_index < max_threads and test_number < len(
testcase_file_paths):
testcase_file_path = testcase_file_paths[test_number]
gestures = testcases_metadata[testcase_file_path]['gestures']
env_copy = environment.copy()
thread = process_handler.get_process()(
target=testcase_manager.run_testcase_and_return_result_in_queue,
args=(temp_queue, thread_index, testcase_file_path, gestures,
env_copy, True))
try:
thread.start()
except:
process_handler.terminate_stale_application_instances()
thread_error_occurred = True
logs.log_error('Unable to start new thread.')
break
threads.append(thread)
thread_index += 1
test_number += 1
if test_number % testcases_before_stale_process_cleanup == 0:
needs_stale_process_cleanup = True
time.sleep(thread_delay)
............
............
上面调用了run_testcase_and_return_result_in_queue
,它是运行一个testcases,并且上传crash信息了
def run_testcase_and_return_result_in_queue(crash_queue,
thread_index,
file_path,
gestures,
env_copy,
upload_output=False):
"""Run a single testcase and return crash results in the crash queue."""
# Since this is running in its own process, initialize the log handler again.
# This is needed for Windows where instances are not shared across child
# processes. See:
# https://stackoverflow.com/questions/34724643/python-logging-with-multiprocessing-root-logger-different-in-windows
logs.configure('run_testcase', {
'testcase_path': file_path,
})
# Also reinitialize NDB context for the same reason as above.
with ndb_init.context():
_do_run_testcase_and_return_result_in_queue(
crash_queue,
thread_index,
file_path,
gestures,
env_copy,
upload_output=upload_output)
里面又调用了_do_run_testcase_and_return_result_in_queue
,里面就是上传CrashResult了,根据这,实际的fuzz代码应该就是self.generate_blackbox_testcases
def _do_run_testcase_and_return_result_in_queue(crash_queue,
thread_index,
file_path,
gestures,
env_copy,
upload_output=False):
"""Run a single testcase and return crash results in the crash queue."""
try:
# Run testcase and check whether a crash occurred or not.
return_code, crash_time, output = run_testcase(thread_index, file_path,
gestures, env_copy)
# Pull testcase directory to host to get any stats files.
if environment.is_trusted_host():
from bot.untrusted_runner import file_host
file_host.pull_testcases_from_worker()
# Analyze the crash.
crash_output = _get_crash_output(output)
crash_result = CrashResult(return_code, crash_time, crash_output)
# To provide consistency between stats and logs, we use timestamp taken
# from stats when uploading logs and testcase.
if upload_output:
log_time = _get_testcase_time(file_path)
if crash_result.is_crash():
# Initialize resource list with the testcase path.
resource_list = [file_path]
resource_list += get_resource_paths(crash_output)
# Store the crash stack file in the crash stacktrace directory
# with filename as the hash of the testcase path.
crash_stacks_directory = environment.get_value('CRASH_STACKTRACES_DIR')
stack_file_path = os.path.join(crash_stacks_directory,
utils.string_hash(file_path))
utils.write_data_to_file(crash_output, stack_file_path)
# Put crash/no-crash results in the crash queue.
crash_queue.put(
Crash(
file_path=file_path,
crash_time=crash_time,
return_code=return_code,
resource_list=resource_list,
gestures=gestures,
stack_file_path=stack_file_path))
# Don't upload uninteresting testcases (no crash) or if there is no log to
# correlate it with (not upload_output).
if upload_output:
upload_testcase(file_path, log_time)
if upload_output:
# Include full output for uploaded logs (crash output, merge output, etc).
crash_result_full = CrashResult(return_code, crash_time, output)
log = prepare_log_for_upload(crash_result_full.get_stacktrace(),
return_code)
upload_log(log, log_time)
except Exception:
logs.log_error('Exception occurred while running '
'run_testcase_and_return_result_in_queue.')
我们上传zip包,但是里面可能有多个target,有些可能只是fuzzer所需文件,那怎么找到要运行的target呢
我们跟踪一下
是fuzz task,就进来src/python/bot/tasks/fuzz_task.py
执行execute_task
def execute_task(fuzzer_name, job_type):
"""Runs the given fuzzer for one round."""
test_timeout = environment.get_value('TEST_TIMEOUT')
session = FuzzingSession(fuzzer_name, job_type, test_timeout)
session.run()
上面调用FuzzingSession
类里面的run函数,在run函数里面https://github.com/google/clusterfuzz/blob/9c2065a7f7b7802936b1133733402adc65ac0c4c/src/python/bot/tasks/fuzz_task.py#L1843
这里调用了build_manager.setup_build
build_setup_result = build_manager.setup_build(
environment.get_value('APP_REVISION'), target_weights=target_weights)
跟进build_manager.setup_build
def setup_build(revision=0, target_weights=None):
"""Set up a custom or regular build based on revision."""
# For custom binaries we always use the latest version. Revision is ignored.
custom_binary = environment.get_value('CUSTOM_BINARY')
if custom_binary:
return setup_custom_binary(target_weights=target_weights)
# In this case, we assume the build is already installed on the system.
system_binary = environment.get_value('SYSTEM_BINARY_DIR')
if system_binary:
return setup_system_binary()
fuzz_target_build_bucket_path = environment.get_value(
'FUZZ_TARGET_BUILD_BUCKET_PATH')
if fuzz_target_build_bucket_path:
# Split fuzz target build.
return _setup_split_targets_build(
fuzz_target_build_bucket_path, target_weights, revision=revision)
if revision:
# Setup regular build with revision.
return setup_regular_build(revision, target_weights=target_weights)
# If no revision is provided, we default to a trunk build.
bucket_paths = []
for env_var in DEFAULT_BUILD_BUCKET_PATH_ENV_VARS:
bucket_path = environment.get_value(env_var)
if bucket_path:
bucket_paths.append(bucket_path)
return setup_trunk_build(bucket_paths, target_weights=target_weights)
首先是setup_custom_binary
,里面调用了CustomBuild的setup
build = CustomBuild(
base_build_dir,
job.custom_binary_key,
job.custom_binary_filename,
job.custom_binary_revision,
target_weights=target_weights)
# Revert back the actual job name.
if share_build_job_type:
environment.set_value('JOB_NAME', old_job_name)
if build.setup():
return build
看setup
def setup(self):
"""Set up the custom binary for a particular job."""
self._pre_setup()
# Track the key for the custom binary so we can create a download link
# later.
environment.set_value('BUILD_KEY', self.custom_binary_key)
logs.log('Retrieving custom binary build r%d.' % self.revision)
revision_file = os.path.join(self.build_dir, REVISION_FILE_NAME)
build_update = revisions.needs_update(revision_file, self.revision)
if build_update:
if not self._unpack_custom_build(): # 解压上传的压缩包,里面调用了archive.unpack
return False
logs.log('Retrieved custom binary build r%d.' % self.revision)
else:
logs.log('Build already exists.')
_set_random_fuzz_target_for_fuzzing_if_needed(
self._get_fuzz_targets_from_dir(self.build_dir), self.target_weights)
self._setup_application_path(build_update=build_update)
self._post_setup_success(update_revision=build_update)
return True
上面的_set_random_fuzz_target_for_fuzzing_if_needed
就选择压缩包中的二进制文件了
def _set_random_fuzz_target_for_fuzzing_if_needed(fuzz_targets, target_weights):
"""Sets a random fuzz target for fuzzing."""
fuzz_target = environment.get_value('FUZZ_TARGET')
if fuzz_target:
logs.log('Use previously picked fuzz target %s for fuzzing.' % fuzz_target)
return fuzz_target
if not environment.is_engine_fuzzer_job():
return None
fuzz_targets = list(fuzz_targets)
if not fuzz_targets:
logs.log_error('No fuzz targets found. Unable to pick random one.')
return None
environment.set_value('FUZZ_TARGET_COUNT', len(fuzz_targets))
fuzz_target = fuzzer_selection.select_fuzz_target(fuzz_targets,
target_weights)
environment.set_value('FUZZ_TARGET', fuzz_target)
logs.log('Picked fuzz target %s for fuzzing.' % fuzz_target)
return fuzz_target
但我们初步选择的是_get_fuzz_targets_from_dir
,之后根据这个结果,在通过target_weights从里面选,所以第一部选还是在_get_fuzz_targets_from_dir
,确保他是一个fuzzer
CustomBuild没有这个函数,那实际就是父类的_get_fuzz_targets_from_dir
,可以看到是调用get_fuzz_targets
def _get_fuzz_targets_from_dir(self, build_dir):
"""Get iterator of fuzz targets from build dir."""
# Import here as this path is not available in App Engine context.
from bot.fuzzers import utils as fuzzer_utils
for path in fuzzer_utils.get_fuzz_targets(build_dir):
yield os.path.splitext(os.path.basename(path))[0]
而这个fuzzer_utils.get_fuzz_targets
是 from bot.fuzzers import utils as fuzzer_utils
,那就是src/python/bot/fuzzers/utils.py
里面的
def get_fuzz_targets(path):
"""Get list of fuzz targets paths."""
if environment.is_trusted_host():
from bot.untrusted_runner import file_host
return file_host.get_fuzz_targets(path)
return get_fuzz_targets_local(path)
继续跟进get_fuzz_targets_local
def get_fuzz_targets_local(path):
"""Get list of fuzz targets paths (local)."""
fuzz_target_paths = []
for root, _, files in shell.walk(path):
for filename in files:
file_path = os.path.join(root, filename)
if is_fuzz_target_local(file_path):
fuzz_target_paths.append(file_path)
return fuzz_target_paths
就是is_fuzz_target_local
了
1、首先得名字得满足正则VALID_TARGET_NAME
2、后缀名是ALLOWED_FUZZ_TARGET_EXTENSIONS
,即无后缀,exe或者par
3、最后就是文件中得有FUZZ_TARGET_SEARCH_BYTES
,也即LLVMFuzzerTestOneInput
这个函数
ALLOWED_FUZZ_TARGET_EXTENSIONS = ['', '.exe', '.par']
FUZZ_TARGET_SEARCH_BYTES = b'LLVMFuzzerTestOneInput'
VALID_TARGET_NAME = re.compile(r'^[a-zA-Z0-9_-]+$')
def is_fuzz_target_local(file_path, file_handle=None):
#TODO(hzawawy): handle syzkaller case.
"""Returns whether |file_path| is a fuzz target binary (local path)."""
filename, file_extension = os.path.splitext(os.path.basename(file_path))
if not VALID_TARGET_NAME.match(filename):
# Check fuzz target has a valid name (without any special chars).
return False
if file_extension not in ALLOWED_FUZZ_TARGET_EXTENSIONS:
# Ignore files with disallowed extensions (to prevent opening e.g. .zips).
return False
if not file_handle and not os.path.exists(file_path):
# Ignore non-existant files for cases when we don't have a file handle.
return False
if filename.endswith('_fuzzer'):
return True
# TODO(aarya): Remove this optimization if it does not show up significant
# savings in profiling results.
fuzz_target_name_regex = environment.get_value('FUZZER_NAME_REGEX')
if fuzz_target_name_regex:
return bool(re.match(fuzz_target_name_regex, filename))
if os.path.exists(file_path) and not stat.S_ISREG(os.stat(file_path).st_mode):
# Don't read special files (eg: /dev/urandom).
logs.log_warn('Tried to read from non-regular file: %s.' % file_path)
return False
# Use already provided file handle or open the file.
local_file_handle = file_handle or open(file_path, 'rb')
# TODO(metzman): Bound this call so we don't read forever if something went
# wrong.
result = utils.search_bytes_in_file(FUZZ_TARGET_SEARCH_BYTES,
local_file_handle)
if not file_handle:
# If this local file handle is owned by our function, close it now.
# Otherwise, it is caller's responsibility.
local_file_handle.close()
return result