前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >ClusterFuzz的bot源码(fuzz task)阅读

ClusterFuzz的bot源码(fuzz task)阅读

作者头像
用户1423082
发布2024-12-31 20:13:12
发布2024-12-31 20:13:12
3100
代码可运行
举报
文章被收录于专栏:giantbranch's bloggiantbranch's blog
运行总次数:0
代码可运行

当时阅读时候的版本:ClusterFuzz v2.0.1

总览

首先运行bot的入口是python butler.py run_bot

代码语言:javascript
代码运行次数:0
运行
复制
args = parser.parse_args()
if not args.command:
  parser.print_help()
  return

_setup()
command = importlib.import_module('local.butler.%s' % args.command)
command.execute(args)

之后执行中的src/local/butler/run_bot.py中的execute

代码语言:javascript
代码运行次数:0
运行
复制
def execute(args):
  """Run the bot."""
  appengine_path = appengine.find_sdk_path()

  _setup_bot_directory(args)
  _setup_environment_and_configs(args, appengine_path)

  try:
    os.chdir(os.path.join(args.directory, 'clusterfuzz'))
    proc = common.execute_async('python src/python/bot/startup/run_bot.py')

    def _stop_handler(*_):
      print('Bot has been stopped. Exit.')
      proc.kill()

    signal.signal(signal.SIGTERM, _stop_handler)
    common.process_proc_output(proc)
    proc.wait()
  except KeyboardInterrupt:
    _stop_handler()

之后就是执行src/python/bot/startup/run_bot.py,而这里面是有定义main函数的

代码语言:javascript
代码运行次数:0
运行
复制
if __name__ == '__main__':
  if sys.version_info.major == 3:
    # TODO(ochang): Remove check once all migrated to Python 3.
    multiprocessing.set_start_method('spawn')

  try:
    with ndb_init.context():
      main()
    exit_code = 0
  except Exception:
    traceback.print_exc()
    exit_code = 1

  monitor.stop()

  # Prevent python GIL deadlocks on shutdown. See https://crbug.com/744680.
  os._exit(exit_code)  # pylint: disable=protected-access

上面就是一些初始化就执行main函数,我们来看main函数

代码语言:javascript
代码运行次数:0
运行
复制
def main():
  """Prepare the configuration options and start requesting tasks."""
  logs.configure('run_bot')

  root_directory = environment.get_value('ROOT_DIR')
  if not root_directory:
    print('Please set ROOT_DIR environment variable to the root of the source '
          'checkout before running. Exiting.')
    print('For an example, check init.bash in the local directory.')
    return

  dates.initialize_timezone_from_environment()
  environment.set_bot_environment()
  monitor.initialize()

  if not profiler.start_if_needed('python_profiler_bot'):
    sys.exit(-1)

  fuzzers_init.run()

  if environment.is_trusted_host(ensure_connected=False):
    from bot.untrusted_runner import host
    host.init()

  if environment.is_untrusted_worker():
    # Track revision since we won't go into the task_loop.
    update_task.track_revision()

    from bot.untrusted_runner import untrusted as untrusted_worker
    untrusted_worker.start_server()
    assert False, 'Unreachable code'

  while True:
    # task_loop should be an infinite loop,
    # unless we run into an exception.
    error_stacktrace, clean_exit, task_payload = task_loop()   ### 这里是核心,task_loop

    # Print the error trace to the console.
    if not clean_exit:
      print('Exception occurred while running "%s".' % task_payload)
      print('-' * 80)
      print(error_stacktrace)
      print('-' * 80)

    should_terminate = (
        clean_exit or errors.error_in_list(error_stacktrace,
                                           errors.BOT_ERROR_TERMINATION_LIST))
    if should_terminate:
      return

    logs.log_error(
        'Task exited with exception (payload="%s").' % task_payload,
        error_stacktrace=error_stacktrace)

    should_hang = errors.error_in_list(error_stacktrace,
                                       errors.BOT_ERROR_HANG_LIST)
    if should_hang:
      logs.log('Start hanging forever.')
      while True:
        # Sleep to avoid consuming 100% of CPU.
        time.sleep(60)

    # See if our run timed out, if yes bail out.
    if data_handler.bot_run_timed_out():
      return

    # Clear the current exception.
    utils.exc_clear()

看到task_loop()函数,task = tasks.get_task()获取任务,commands.process_command(task)执行命令并删除任务

代码语言:javascript
代码运行次数:0
运行
复制
def task_loop():
  """Executes tasks indefinitely."""
  clean_exit = False
  while True:
    exception_occurred = False
    task = None
    # This caches the current environment on first run. Don't move this.
    environment.reset_environment()
    try:
      # Run regular updates.
      update_task.run()
      update_task.track_revision()

      task = tasks.get_task() 		### 获取从任务
      if not task:
        continue

      with _Monitor(task):
        with task.lease():
          # Execute the command and delete the task.
          commands.process_command(task)    # 执行命令并删除任务
    except SystemExit as e:
      exception_occurred = True
      clean_exit = (e.code == 0)
      if not clean_exit and not isinstance(e, untrusted.HostException):
        logs.log_error('SystemExit occurred while working on task.')
    except commands.AlreadyRunningError:
      exception_occurred = False
    except Exception:
      logs.log_error('Error occurred while working on task.')
      exception_occurred = True

    if exception_occurred:
      # Prevent looping too quickly. See: crbug.com/644830
      failure_wait_interval = environment.get_value('FAIL_WAIT')
      time.sleep(utils.random_number(1, failure_wait_interval))
      break

  task_payload = task.payload() if task else None
  return traceback.format_exc(), clean_exit, task_payload

获取任务

我们先看get_task函数,可以看这里除了fuzz的任务,还有其他任务

代码语言:javascript
代码运行次数:0
运行
复制
def get_task():
  """Get a task."""
  task = get_command_override()
  if task:
    return task

  # TODO(unassigned): Remove this hack.
  if environment.get_value('ML'):
    return get_regular_task(queue=ML_JOBS_TASKQUEUE)

  allow_all_tasks = not environment.get_value('PREEMPTIBLE')
  if allow_all_tasks:
    # Check the high-end jobs queue for bots with multiplier greater than 1.
    thread_multiplier = environment.get_value('THREAD_MULTIPLIER')
    if thread_multiplier and thread_multiplier > 1:
      task = get_high_end_task()
      if task:
        return task

    task = get_regular_task()
    if task:
      return task

  task = get_fuzz_task()
  if not task:
    logs.log_error('Failed to get any fuzzing tasks. This should not happen.')
    time.sleep(TASK_EXCEPTION_WAIT_INTERVAL)

  return task

我们还是比较关心fuzz,看get_fuzz_task,这里是获取argument和job

代码语言:javascript
代码运行次数:0
运行
复制
def get_fuzz_task():
  """Try to get a fuzz task."""
  argument, job = fuzzer_selection.get_fuzz_task_payload()
  if not argument:
    return None

  return Task('fuzz', argument, job)

继续跟fuzzer_selection.get_fuzz_task_payload(),这里是去谷歌云那边查询任务了,最后随机选取任务返回

代码语言:javascript
代码运行次数:0
运行
复制
def get_fuzz_task_payload(platform=None):
  """Select a fuzzer that can run on this platform."""
  if not platform:
    queue_override = environment.get_value('QUEUE_OVERRIDE')
    platform = queue_override if queue_override else environment.platform()

  query = data_types.FuzzerJob.query()
  query = query.filter(data_types.FuzzerJob.platform == platform)

  mappings = list(ndb_utils.get_all_from_query(query))
  if not mappings:
    return None, None

  selection = utils.random_weighted_choice(
      mappings, weight_attribute='actual_weight')   # 最后随机选取任务返回
  return selection.fuzzer, selection.job

执行任务

代码语言:javascript
代码运行次数:0
运行
复制
# pylint: disable=too-many-nested-blocks
# TODO(mbarbella): Rewrite this function to avoid nesting issues.
@set_task_payload
def process_command(task):
  """Figures out what to do with the given task and executes the command."""
  logs.log("Executing command '%s'" % task.payload())
  if not task.payload().strip():
    logs.log_error('Empty task received.')
    return

  # Parse task payload.
  task_name = task.command
  task_argument = task.argument
  job_name = task.job
  ......
  ......
  ......
    # Match the cpu architecture with the ones required in the job definition.
  # If they don't match, then bail out and recreate task.
  if not is_supported_cpu_arch_for_job():
    logs.log(
        'Unsupported cpu architecture specified in job definition, exiting.')
    tasks.add_task(task_name, task_argument, job_name)
    return

  # Initial cleanup.
  cleanup_task_state()

  start_web_server_if_needed()

  try:
    run_command(task_name, task_argument, job_name) # 运行命令
  finally:
    # Final clean up.
    cleanup_task_state()

看run_command,实际是task_module.execute_task(task_argument, job_name)执行

代码语言:javascript
代码运行次数:0
运行
复制
def run_command(task_name, task_argument, job_name):
  """Run the command."""
  if task_name not in COMMAND_MAP:
    logs.log_error("Unknown command '%s'" % task_name)
    return

  task_module = COMMAND_MAP[task_name]

  # If applicable, ensure this is the only instance of the task running.
  task_state_name = ' '.join([task_name, task_argument, job_name])
  if should_update_task_status(task_name):
    if not data_handler.update_task_status(task_state_name,
                                           data_types.TaskState.STARTED):
      logs.log('Another instance of "{}" already '
               'running, exiting.'.format(task_state_name))
      raise AlreadyRunningError

  try:
    task_module.execute_task(task_argument, job_name)  # 执行任务
  except errors.InvalidTestcaseError:
    # It is difficult to try to handle the case where a test case is deleted
    # during processing. Rather than trying to catch by checking every point
    # where a test case is reloaded from the datastore, just abort the task.
    logs.log_error('Test case %s no longer exists.' % task_argument)
  except BaseException:
    # On any other exceptions, update state to reflect error and re-raise.
    if should_update_task_status(task_name):
      data_handler.update_task_status(task_state_name,
                                      data_types.TaskState.ERROR)

    raise

  # Task completed successfully.
  if should_update_task_status(task_name):
    data_handler.update_task_status(task_state_name,
                                    data_types.TaskState.FINISHED)

其实一开始判断task_name是否在COMMAND_MAP中,可以看到除了fuzz任务外,还有很多任务

代码语言:javascript
代码运行次数:0
运行
复制
COMMAND_MAP = {
    'analyze': analyze_task,
    'blame': blame_task,
    'corpus_pruning': corpus_pruning_task,
    'fuzz': fuzz_task,
    'impact': impact_task,
    'minimize': minimize_task,
    'ml_train': ml_train_task,
    'progression': progression_task,
    'regression': regression_task,
    'symbolize': symbolize_task,
    'unpack': unpack_task,
    'upload_reports': upload_reports_task,
    'variant': variant_task,
}

继续跟task_module.execute_task,我们关注fuzz的吧,就是fuzz_task.execute_task

代码语言:javascript
代码运行次数:0
运行
复制
def execute_task(fuzzer_name, job_type):
  """Runs the given fuzzer for one round."""
  test_timeout = environment.get_value('TEST_TIMEOUT')
  session = FuzzingSession(fuzzer_name, job_type, test_timeout)
  session.run()

先获取超时,之后初始化FuzzingSession,初始化代码如下:

代码语言:javascript
代码运行次数:0
运行
复制

class FuzzingSession(object):
  """Class for orchestrating fuzzing sessions."""

  def __init__(self, fuzzer_name, job_type, test_timeout):
    self.fuzzer_name = fuzzer_name
    self.job_type = job_type

    # Set up randomly selected fuzzing parameters.
    self.redzone = pick_redzone()
    self.disable_ubsan = pick_ubsan_disabled(job_type)
    self.timeout_multiplier = pick_timeout_multiplier()
    self.window_argument = pick_window_argument()
    self.test_timeout = set_test_timeout(test_timeout, self.timeout_multiplier)

    # Set up during run().
    self.testcase_directory = None
    self.data_directory = None

    # Fuzzing engine specific state.
    self.fuzz_target = None
    self.gcs_corpus = None

run的代码

代码语言:javascript
代码运行次数:0
运行
复制
def run(self):
  """Run the fuzzing session."""
  failure_wait_interval = environment.get_value('FAIL_WAIT')

  # Update LSAN local blacklist with global blacklist.
  is_lsan_enabled = environment.get_value('LSAN')
  if is_lsan_enabled:
    leak_blacklist.copy_global_to_local_blacklist()

  # For some binaries, we specify trials, which are sets of flags that we only
  # apply some of the time. Adjust APP_ARGS for them if needed.
  trials.setup_additional_args_for_app()

  # Ensure that that the fuzzer still exists.
  logs.log('Setting up fuzzer and data bundles.')
  fuzzer = data_types.Fuzzer.query(
      data_types.Fuzzer.name == self.fuzzer_name).get()
  if not fuzzer or not setup.update_fuzzer_and_data_bundles(self.fuzzer_name):
    _track_fuzzer_run_result(self.fuzzer_name, 0, 0,
                             FuzzErrorCode.FUZZER_SETUP_FAILED)
    logs.log_error('Unable to setup fuzzer %s.' % self.fuzzer_name)

    # Artifical sleep to slow down continuous failed fuzzer runs if the bot is
    # using command override for task execution.
    time.sleep(failure_wait_interval)
    return

  self.testcase_directory = environment.get_value('FUZZ_INPUTS')

  # Set up a custom or regular build based on revision. By default, fuzzing
  # is done on trunk build (using revision=None). Otherwise, a job definition
  # can provide a revision to use via |APP_REVISION|.
  target_weights = fuzzer_selection.get_fuzz_target_weights()

  build_setup_result = build_manager.setup_build(
      environment.get_value('APP_REVISION'), target_weights=target_weights)
  # Check if we have an application path. If not, our build failed
  # to setup correctly.
  if not build_setup_result or not build_manager.check_app_path():
    _track_fuzzer_run_result(self.fuzzer_name, 0, 0,
                             FuzzErrorCode.BUILD_SETUP_FAILED)
    return

  dataflow_bucket_path = environment.get_value('DATAFLOW_BUILD_BUCKET_PATH')
  if dataflow_bucket_path:
    # Some fuzzing jobs may use auxiliary builds, such as DFSan instrumented
    # builds accompanying libFuzzer builds to enable DFT-based fuzzing.
    if not build_manager.setup_trunk_build(
        [dataflow_bucket_path], build_prefix='DATAFLOW'):
      logs.log_error('Failed to set up dataflow build.')

  # Save fuzz targets count to aid with CPU weighting.
  self._save_fuzz_targets_count()

  # Check if we have a bad build, i.e. one that crashes on startup.
  # If yes, bail out.
  logs.log('Checking for bad build.')
  crash_revision = environment.get_value('APP_REVISION')
  is_bad_build = testcase_manager.check_for_bad_build(self.job_type,
                                                      crash_revision)
  _track_build_run_result(self.job_type, crash_revision, is_bad_build)
  if is_bad_build:
    return

  # Data bundle directories can also have testcases which are kept in-place
  # because of dependencies.
  self.data_directory = setup.get_data_bundle_directory(self.fuzzer_name)
  if not self.data_directory:
    _track_fuzzer_run_result(self.fuzzer_name, 0, 0,
                             FuzzErrorCode.DATA_BUNDLE_SETUP_FAILED)
    logs.log_error(
        'Unable to setup data bundle %s.' % fuzzer.data_bundle_name)
    return

  engine_impl = engine.get(fuzzer.name)
  if engine_impl:
    crashes, fuzzer_metadata = self.do_engine_fuzzing(engine_impl)

    # Not applicable to engine fuzzers.
    testcase_file_paths = []
    testcases_metadata = {}
  else:
    fuzzer_directory = setup.get_fuzzer_directory(self.fuzzer_name)
    fuzzer_metadata, testcase_file_paths, testcases_metadata, crashes = (
        self.do_blackbox_fuzzing(fuzzer, fuzzer_directory, self.job_type))

  if crashes is None:
    # Error occurred in generate_blackbox_testcases.
    # TODO(ochang): Pipe this error a little better.
    return

  logs.log('Finished processing test cases.')

  platform = environment.platform()
  platform_id = environment.get_platform_id()

  # For Android, bring back device to a good state before analyzing crashes.
  if platform == 'ANDROID' and crashes:
    # Remove this variable so that application is fully shutdown before every
    # re-run of testcase. This is critical for reproducibility.
    environment.remove_key('CHILD_PROCESS_TERMINATION_PATTERN')

    # TODO(unassigned): Need to find a way to this efficiently before every
    # testcase is analyzed.
    android.device.initialize_device()

  logs.log('Raw crash count: ' + str(len(crashes)))

  project_name = data_handler.get_project_name(self.job_type)

  # Process and save crashes to datastore.
  bot_name = environment.get_value('BOT_NAME')
  new_crash_count, known_crash_count, processed_groups = process_crashes(
      crashes=crashes,
      context=Context(
          project_name=project_name,
          bot_name=bot_name,
          job_type=self.job_type,
          fuzz_target=self.fuzz_target,
          redzone=self.redzone,
          disable_ubsan=self.disable_ubsan,
          platform_id=platform_id,
          crash_revision=crash_revision,
          fuzzer_name=self.fuzzer_name,
          window_argument=self.window_argument,
          fuzzer_metadata=fuzzer_metadata,
          testcases_metadata=testcases_metadata,
          timeout_multiplier=self.timeout_multiplier,
          test_timeout=self.test_timeout,
          thread_wait_timeout=THREAD_WAIT_TIMEOUT,
          data_directory=self.data_directory))

  read_and_upload_testcase_run_stats(
      self.fuzzer_name, self.fully_qualified_fuzzer_name, self.job_type,
      crash_revision, testcase_file_paths)
  upload_job_run_stats(self.fully_qualified_fuzzer_name, self.job_type,
                       crash_revision, time.time(),
                       new_crash_count, known_crash_count,
                       len(testcase_file_paths), processed_groups)

  # Delete the fuzzed testcases. This is explicitly needed since
  # some testcases might reside on NFS and would otherwise be
  # left forever.
  for testcase_file_path in testcase_file_paths:
    shell.remove_file(testcase_file_path)

  # Explicit cleanup for large vars.
  del testcase_file_paths
  del testcases_metadata
  utils.python_gc()

实在太多了,前面做了一些初始化,之后是选择引擎进行fuzz——self.do_engine_fuzzing(engine_impl),没有的就是黑盒测试self.do_blackbox_fuzzing

代码语言:javascript
代码运行次数:0
运行
复制
engine_impl = engine.get(fuzzer.name)
   if engine_impl:
     crashes, fuzzer_metadata = self.do_engine_fuzzing(engine_impl)

     # Not applicable to engine fuzzers.
     testcase_file_paths = []
     testcases_metadata = {}
   else:
     fuzzer_directory = setup.get_fuzzer_directory(self.fuzzer_name)
     fuzzer_metadata, testcase_file_paths, testcases_metadata, crashes = (
         self.do_blackbox_fuzzing(fuzzer, fuzzer_directory, self.job_type))

接下来最后就是处理crashes,上传crash,最后更新任务的状态

代码语言:javascript
代码运行次数:0
运行
复制
# Process and save crashes to datastore.
    bot_name = environment.get_value('BOT_NAME')
    new_crash_count, known_crash_count, processed_groups = process_crashes(
        crashes=crashes,
        context=Context(
            project_name=project_name,
            bot_name=bot_name,
            job_type=self.job_type,
            fuzz_target=self.fuzz_target,
            redzone=self.redzone,
            disable_ubsan=self.disable_ubsan,
            platform_id=platform_id,
            crash_revision=crash_revision,
            fuzzer_name=self.fuzzer_name,
            window_argument=self.window_argument,
            fuzzer_metadata=fuzzer_metadata,
            testcases_metadata=testcases_metadata,
            timeout_multiplier=self.timeout_multiplier,
            test_timeout=self.test_timeout,
            thread_wait_timeout=THREAD_WAIT_TIMEOUT,
            data_directory=self.data_directory))

    read_and_upload_testcase_run_stats(
        self.fuzzer_name, self.fully_qualified_fuzzer_name, self.job_type,
        crash_revision, testcase_file_paths)
    upload_job_run_stats(self.fully_qualified_fuzzer_name, self.job_type,
                         crash_revision, time.time(),
                         new_crash_count, known_crash_count,
                         len(testcase_file_paths), processed_groups)

do_engine_fuzzing

代码语言:javascript
代码运行次数:0
运行
复制
def do_engine_fuzzing(self, engine_impl):
    """Run fuzzing engine."""
    # Record fuzz target.
    fuzz_target_name = environment.get_value('FUZZ_TARGET')
    self.fuzz_target = record_fuzz_target(engine_impl.name, fuzz_target_name,
                                          self.job_type)
    environment.set_value('FUZZER_NAME',
                          self.fuzz_target.fully_qualified_name())

    # Synchronize corpus files with GCS
    sync_corpus_directory = builtin.get_corpus_directory(
        self.data_directory, self.fuzz_target.project_qualified_name())
    self.sync_corpus(sync_corpus_directory)

    # Reset memory tool options.
    environment.reset_current_memory_tool_options(
        redzone_size=self.redzone, disable_ubsan=self.disable_ubsan)

    revision = environment.get_value('APP_REVISION')
    crashes = []
    fuzzer_metadata = {}
    return_code = 1  # Vanilla return-code for engine crashes.

    # Do the actual fuzzing.
    for fuzzing_round in range(environment.get_value('MAX_TESTCASES', 1)):
      logs.log('Fuzzing round {}.'.format(fuzzing_round))
      result, current_fuzzer_metadata = run_engine_fuzzer(
          engine_impl, self.fuzz_target.binary, sync_corpus_directory,
          self.testcase_directory)
      fuzzer_metadata.update(current_fuzzer_metadata)

      # Prepare stats.
      testcase_run = engine_common.get_testcase_run(result.stats,
                                                    result.command)

      # Upload logs, testcases (if there are crashes), and stats.
      # Use a consistent log time to allow correlating between logs, uploaded
      # testcases, and stats.
      log_time = datetime.datetime.utcfromtimestamp(
          float(testcase_run.timestamp))
      crash_result = CrashResult(return_code, result.time_executed, result.logs)
      log = testcase_manager.prepare_log_for_upload(
          crash_result.get_stacktrace(), return_code)
      testcase_manager.upload_log(log, log_time)

      for crash in result.crashes:
        testcase_manager.upload_testcase(crash.input_path, log_time)

      add_additional_testcase_run_data(testcase_run,
                                       self.fuzz_target.fully_qualified_name(),
                                       self.job_type, revision)
      upload_testcase_run_stats(testcase_run)
      if result.crashes:
        crashes.extend([
            Crash.from_engine_crash(crash) for crash in result.crashes if crash
        ])

    logs.log('All fuzzing rounds complete.')
    self.sync_new_corpus_files()

    return crashes, fuzzer_metadata

实际fuzz是run_engine_fuzzer

代码语言:javascript
代码运行次数:0
运行
复制
def run_engine_fuzzer(engine_impl, target_name, sync_corpus_directory,
                      testcase_directory):
  """Run engine for fuzzing."""
  if environment.is_trusted_host():
    from bot.untrusted_runner import tasks_host    # 这还搞了个untrusted的runner,太复杂了
    return tasks_host.engine_fuzz(engine_impl, target_name,
                                  sync_corpus_directory, testcase_directory)

  build_dir = environment.get_value('BUILD_DIR')
  target_path = engine_common.find_fuzzer_path(build_dir, target_name)
  options = engine_impl.prepare(sync_corpus_directory, target_path, build_dir)

  fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT')
  result = engine_impl.fuzz(target_path, options, testcase_directory,
                            fuzz_test_timeout)     # 调用对应引擎的fuzz函数

  logs.log('Used strategies.', strategies=options.strategies)
  for strategy, value in six.iteritems(options.strategies):
    result.stats['strategy_' + strategy] = value

  # Format logs with header and strategy information.
  log_header = engine_common.get_log_header(result.command,
                                            environment.get_value('BOT_NAME'),
                                            result.time_executed)

  formatted_strategies = engine_common.format_fuzzing_strategies(
      options.strategies)

  result.logs = log_header + '\n' + result.logs + '\n' + formatted_strategies

  fuzzer_metadata = {
      'fuzzer_binary_name': target_name,
  }

  fuzzer_metadata.update(engine_common.get_all_issue_metadata(target_path))
  _add_issue_metadata_from_environment(fuzzer_metadata)
  return result, fuzzer_metadata

引擎类

上面提到的运行fuzzer,是通过engine.get是获取引擎类,get函数如下

代码语言:javascript
代码运行次数:0
运行
复制
def get(name):
  """Get an implemntation of a fuzzing engine, or None if one does not exist."""
  engine_class = _ENGINES.get(name)
  if engine_class:
    return engine_class()

  return None

而之前得先注册

代码语言:javascript
代码运行次数:0
运行
复制
def register(name, engine_class):
  """Register a fuzzing engine."""
  if name in _ENGINES:
    raise ValueError('Engine {name} is already registered'.format(name=name))

  _ENGINES[name] = engine_class

而注册这个在src/python/bot/startup/run_bot.py的时候注册的

代码语言:javascript
代码运行次数:0
运行
复制
......
from bot.fuzzers import init as fuzzers_init
......
def main():

  if not profiler.start_if_needed('python_profiler_bot'):
    sys.exit(-1)

  fuzzers_init.run() # 在这里注册

跟过去可以看注册了libFuzzer,honggfuzz和syzkaller,有疑问的是咋没有afl

代码语言:javascript
代码运行次数:0
运行
复制
def run():
  """Initialise builtin fuzzing engines."""
  engine.register('libFuzzer', libFuzzer_engine.LibFuzzerEngine)
  engine.register('honggfuzz', honggfuzz_engine.HonggfuzzEngine)
  engine.register('syzkaller', syzkaller_engine.SyzkallerEngine)

do_blackbox_fuzzing

代码语言:javascript
代码运行次数:0
运行
复制
def do_blackbox_fuzzing(self, fuzzer, fuzzer_directory, job_type):
    """Run blackbox fuzzing. Currently also used for engine fuzzing."""
    # Set the thread timeout values.
    # TODO(ochang): Remove this hack once engine fuzzing refactor is compelte.
    fuzz_test_timeout = environment.get_value('FUZZ_TEST_TIMEOUT')
    if fuzz_test_timeout:
      test_timeout = set_test_timeout(fuzz_test_timeout,
                                      self.timeout_multiplier)
    else:
      test_timeout = self.test_timeout

    thread_timeout = test_timeout

    # Determine number of testcases to process.
    testcase_count = environment.get_value('MAX_TESTCASES')

    # For timeout multipler greater than 1, we need to decrease testcase count
    # to prevent exceeding task lease time.
    if self.timeout_multiplier > 1:
      testcase_count /= self.timeout_multiplier

    # Run the fuzzer to generate testcases. If error occurred while trying
    # to run the fuzzer, bail out.
    (error_occurred, testcase_file_paths, sync_corpus_directory,
     fuzzer_metadata) = self.generate_blackbox_testcases(
         fuzzer, fuzzer_directory, testcase_count)
	......
	......

在self.generate_blackbox_testcases里面是会实际启动fuzzer的,注释说的是Run the blackbox fuzzer and generate testcases.

代码语言:javascript
代码运行次数:0
运行
复制
def generate_blackbox_testcases(self, fuzzer, fuzzer_directory,
                                testcase_count):
  """Run the blackbox fuzzer and generate testcases."""
  # Helper variables.
  error_occurred = False
  fuzzer_revision = fuzzer.revision
  fuzzer_name = fuzzer.name
  sync_corpus_directory = None

  # Clear existing testcases (only if past task failed).
  testcase_directories = get_testcase_directories(self.testcase_directory,
                                                  self.data_directory)
  testcase_manager.remove_testcases_from_directories(testcase_directories)

  # Set an environment variable for fuzzer name.
  # TODO(ochang): Investigate removing this. Only users appear to be chromebot
  # fuzzer and fuzzer_logs, both of which can be removed.
  environment.set_value('FUZZER_NAME', fuzzer_name)

  # Set minimum redzone size, do not detect leaks and zero out the
  # quarantine size before running the fuzzer.
  environment.reset_current_memory_tool_options(
      redzone_size=16, leaks=False, quarantine_size_mb=0)

  if fuzzer.builtin:
    fuzzer_command = 'builtin'
    builtin_fuzzer = builtin_fuzzers.get(fuzzer.name)    #这里面有个afl和libfuzz

    builtin_result = builtin_fuzzer.run(
        self.data_directory, self.testcase_directory, testcase_count)

    fuzzer_output = builtin_result.output
    sync_corpus_directory = builtin_result.corpus_directory

    # Return code is always 0 as builtin fuzzers log errors directly.
    fuzzer_return_code = 0
  else:
    # Make sure we have a file to execute for the fuzzer.
 ......
 ......
 # 获取可执行文件fuzzer路径,应该是libfuzzer那样,二进制文件就是fuzzer
 # Get the fuzzer executable and chdir to its base directory. This helps to
    # prevent referencing every file using __file__.
    fuzzer_executable = os.path.join(fuzzer_directory, fuzzer.executable_path)
    fuzzer_executable_directory = os.path.dirname(fuzzer_executable)
 ......
 ......
 # Build the fuzzer command execution string.
    command = shell.get_execute_command(fuzzer_executable)
 ......
 command_format = ('%s --input_dir%s%s --output_dir%s%s --no_of_files%s%d')
    fuzzer_command = str(
        command_format % (command, argument_seperator, self.data_directory,
                          argument_seperator, self.testcase_directory,
                          argument_seperator, testcase_count))
    fuzzer_timeout = environment.get_value('FUZZER_TIMEOUT')

    # Run the fuzzer.   启动fuzzer
    logs.log('Running fuzzer - %s.' % fuzzer_command)
    fuzzer_return_code, fuzzer_duration, fuzzer_output = (
        process_handler.run_process(
            fuzzer_command,
            current_working_directory=fuzzer_executable_directory,
            timeout=fuzzer_timeout,
            testcase_run=False,
            ignore_children=False))

下面是builtin_fuzzer = builtin_fuzzers.get(fuzzer.name)所能获取到的

代码语言:javascript
代码运行次数:0
运行
复制
BUILTIN_FUZZERS = {
    'afl': afl.Afl(),
    'libFuzzer': libFuzzer.LibFuzzer(),
}


def get(fuzzer_name):
  """Get the builtin fuzzer with the given name, or None."""
  if fuzzer_name not in BUILTIN_FUZZERS:
    return None

  return BUILTIN_FUZZERS[fuzzer_name]

下面的do_blackbox_fuzzing函数的后半部分,是处理testcases的

代码语言:javascript
代码运行次数:0
运行
复制
............
............
    # Start processing the testcases.
    while test_number < len(testcase_file_paths):
      thread_index = 0
      threads = []

      temp_queue = process_handler.get_queue()
      if not temp_queue:
        process_handler.terminate_stale_application_instances()
        logs.log_error('Unable to create temporary crash queue.')
        break

      while thread_index < max_threads and test_number < len(
          testcase_file_paths):
        testcase_file_path = testcase_file_paths[test_number]
        gestures = testcases_metadata[testcase_file_path]['gestures']

        env_copy = environment.copy()
        thread = process_handler.get_process()(
            target=testcase_manager.run_testcase_and_return_result_in_queue,
            args=(temp_queue, thread_index, testcase_file_path, gestures,
                  env_copy, True))

        try:
          thread.start()
        except:
          process_handler.terminate_stale_application_instances()
          thread_error_occurred = True
          logs.log_error('Unable to start new thread.')
          break

        threads.append(thread)
        thread_index += 1
        test_number += 1

        if test_number % testcases_before_stale_process_cleanup == 0:
          needs_stale_process_cleanup = True

        time.sleep(thread_delay)
............
............

上面调用了run_testcase_and_return_result_in_queue,它是运行一个testcases,并且上传crash信息了

代码语言:javascript
代码运行次数:0
运行
复制
def run_testcase_and_return_result_in_queue(crash_queue,
                                            thread_index,
                                            file_path,
                                            gestures,
                                            env_copy,
                                            upload_output=False):
  """Run a single testcase and return crash results in the crash queue."""
  # Since this is running in its own process, initialize the log handler again.
  # This is needed for Windows where instances are not shared across child
  # processes. See:
  # https://stackoverflow.com/questions/34724643/python-logging-with-multiprocessing-root-logger-different-in-windows
  logs.configure('run_testcase', {
      'testcase_path': file_path,
  })

  # Also reinitialize NDB context for the same reason as above.
  with ndb_init.context():
    _do_run_testcase_and_return_result_in_queue(
        crash_queue,
        thread_index,
        file_path,
        gestures,
        env_copy,
        upload_output=upload_output)

里面又调用了_do_run_testcase_and_return_result_in_queue,里面就是上传CrashResult了,根据这,实际的fuzz代码应该就是self.generate_blackbox_testcases

代码语言:javascript
代码运行次数:0
运行
复制
def _do_run_testcase_and_return_result_in_queue(crash_queue,
                                                thread_index,
                                                file_path,
                                                gestures,
                                                env_copy,
                                                upload_output=False):
  """Run a single testcase and return crash results in the crash queue."""
  try:
    # Run testcase and check whether a crash occurred or not.
    return_code, crash_time, output = run_testcase(thread_index, file_path,
                                                   gestures, env_copy)

    # Pull testcase directory to host to get any stats files.
    if environment.is_trusted_host():
      from bot.untrusted_runner import file_host
      file_host.pull_testcases_from_worker()

    # Analyze the crash.
    crash_output = _get_crash_output(output)
    crash_result = CrashResult(return_code, crash_time, crash_output)

    # To provide consistency between stats and logs, we use timestamp taken
    # from stats when uploading logs and testcase.
    if upload_output:
      log_time = _get_testcase_time(file_path)

    if crash_result.is_crash():
      # Initialize resource list with the testcase path.
      resource_list = [file_path]
      resource_list += get_resource_paths(crash_output)

      # Store the crash stack file in the crash stacktrace directory
      # with filename as the hash of the testcase path.
      crash_stacks_directory = environment.get_value('CRASH_STACKTRACES_DIR')
      stack_file_path = os.path.join(crash_stacks_directory,
                                     utils.string_hash(file_path))
      utils.write_data_to_file(crash_output, stack_file_path)

      # Put crash/no-crash results in the crash queue.
      crash_queue.put(
          Crash(
              file_path=file_path,
              crash_time=crash_time,
              return_code=return_code,
              resource_list=resource_list,
              gestures=gestures,
              stack_file_path=stack_file_path))

      # Don't upload uninteresting testcases (no crash) or if there is no log to
      # correlate it with (not upload_output).
      if upload_output:
        upload_testcase(file_path, log_time)

    if upload_output:
      # Include full output for uploaded logs (crash output, merge output, etc).
      crash_result_full = CrashResult(return_code, crash_time, output)
      log = prepare_log_for_upload(crash_result_full.get_stacktrace(),
                                   return_code)
      upload_log(log, log_time)
  except Exception:
    logs.log_error('Exception occurred while running '
                   'run_testcase_and_return_result_in_queue.')

如何获取要运行的target

我们上传zip包,但是里面可能有多个target,有些可能只是fuzzer所需文件,那怎么找到要运行的target呢

我们跟踪一下

是fuzz task,就进来src/python/bot/tasks/fuzz_task.py执行execute_task

代码语言:javascript
代码运行次数:0
运行
复制
def execute_task(fuzzer_name, job_type):
  """Runs the given fuzzer for one round."""
  test_timeout = environment.get_value('TEST_TIMEOUT')
  session = FuzzingSession(fuzzer_name, job_type, test_timeout)
  session.run()

上面调用FuzzingSession类里面的run函数,在run函数里面https://github.com/google/clusterfuzz/blob/9c2065a7f7b7802936b1133733402adc65ac0c4c/src/python/bot/tasks/fuzz_task.py#L1843这里调用了build_manager.setup_build

代码语言:javascript
代码运行次数:0
运行
复制
build_setup_result = build_manager.setup_build(
    environment.get_value('APP_REVISION'), target_weights=target_weights)

跟进build_manager.setup_build

代码语言:javascript
代码运行次数:0
运行
复制
def setup_build(revision=0, target_weights=None):
  """Set up a custom or regular build based on revision."""
  # For custom binaries we always use the latest version. Revision is ignored.
  custom_binary = environment.get_value('CUSTOM_BINARY')
  if custom_binary:
    return setup_custom_binary(target_weights=target_weights)

  # In this case, we assume the build is already installed on the system.
  system_binary = environment.get_value('SYSTEM_BINARY_DIR')
  if system_binary:
    return setup_system_binary()

  fuzz_target_build_bucket_path = environment.get_value(
      'FUZZ_TARGET_BUILD_BUCKET_PATH')
  if fuzz_target_build_bucket_path:
    # Split fuzz target build.
    return _setup_split_targets_build(
        fuzz_target_build_bucket_path, target_weights, revision=revision)

  if revision:
    # Setup regular build with revision.
    return setup_regular_build(revision, target_weights=target_weights)

  # If no revision is provided, we default to a trunk build.
  bucket_paths = []
  for env_var in DEFAULT_BUILD_BUCKET_PATH_ENV_VARS:
    bucket_path = environment.get_value(env_var)
    if bucket_path:
      bucket_paths.append(bucket_path)

  return setup_trunk_build(bucket_paths, target_weights=target_weights)

setup_build 中的setup_custom_binary

首先是setup_custom_binary,里面调用了CustomBuild的setup

代码语言:javascript
代码运行次数:0
运行
复制
build = CustomBuild(
      base_build_dir,
      job.custom_binary_key,
      job.custom_binary_filename,
      job.custom_binary_revision,
      target_weights=target_weights)

  # Revert back the actual job name.
  if share_build_job_type:
    environment.set_value('JOB_NAME', old_job_name)

  if build.setup():
    return build

看setup

代码语言:javascript
代码运行次数:0
运行
复制
def setup(self):
  """Set up the custom binary for a particular job."""
  self._pre_setup()

  # Track the key for the custom binary so we can create a download link
  # later.
  environment.set_value('BUILD_KEY', self.custom_binary_key)

  logs.log('Retrieving custom binary build r%d.' % self.revision)

  revision_file = os.path.join(self.build_dir, REVISION_FILE_NAME)
  build_update = revisions.needs_update(revision_file, self.revision)

  if build_update:
    if not self._unpack_custom_build():    # 解压上传的压缩包,里面调用了archive.unpack
      return False

    logs.log('Retrieved custom binary build r%d.' % self.revision)
  else:
    logs.log('Build already exists.')

    _set_random_fuzz_target_for_fuzzing_if_needed(
        self._get_fuzz_targets_from_dir(self.build_dir), self.target_weights)

  self._setup_application_path(build_update=build_update)
  self._post_setup_success(update_revision=build_update)
  return True

上面的_set_random_fuzz_target_for_fuzzing_if_needed就选择压缩包中的二进制文件了

代码语言:javascript
代码运行次数:0
运行
复制
def _set_random_fuzz_target_for_fuzzing_if_needed(fuzz_targets, target_weights):
  """Sets a random fuzz target for fuzzing."""
  fuzz_target = environment.get_value('FUZZ_TARGET')
  if fuzz_target:
    logs.log('Use previously picked fuzz target %s for fuzzing.' % fuzz_target)
    return fuzz_target

  if not environment.is_engine_fuzzer_job():
    return None

  fuzz_targets = list(fuzz_targets)
  if not fuzz_targets:
    logs.log_error('No fuzz targets found. Unable to pick random one.')
    return None

  environment.set_value('FUZZ_TARGET_COUNT', len(fuzz_targets))

  fuzz_target = fuzzer_selection.select_fuzz_target(fuzz_targets,
                                                    target_weights)
  environment.set_value('FUZZ_TARGET', fuzz_target)
  logs.log('Picked fuzz target %s for fuzzing.' % fuzz_target)

  return fuzz_target

但我们初步选择的是_get_fuzz_targets_from_dir,之后根据这个结果,在通过target_weights从里面选,所以第一部选还是在_get_fuzz_targets_from_dir,确保他是一个fuzzer

CustomBuild没有这个函数,那实际就是父类的_get_fuzz_targets_from_dir,可以看到是调用get_fuzz_targets

代码语言:javascript
代码运行次数:0
运行
复制
def _get_fuzz_targets_from_dir(self, build_dir):
   """Get iterator of fuzz targets from build dir."""
   # Import here as this path is not available in App Engine context.
   from bot.fuzzers import utils as fuzzer_utils

   for path in fuzzer_utils.get_fuzz_targets(build_dir):
     yield os.path.splitext(os.path.basename(path))[0]

而这个fuzzer_utils.get_fuzz_targetsfrom bot.fuzzers import utils as fuzzer_utils,那就是src/python/bot/fuzzers/utils.py里面的

代码语言:javascript
代码运行次数:0
运行
复制
def get_fuzz_targets(path):
  """Get list of fuzz targets paths."""
  if environment.is_trusted_host():
    from bot.untrusted_runner import file_host
    return file_host.get_fuzz_targets(path)
  return get_fuzz_targets_local(path)

继续跟进get_fuzz_targets_local

代码语言:javascript
代码运行次数:0
运行
复制
def get_fuzz_targets_local(path):
  """Get list of fuzz targets paths (local)."""
  fuzz_target_paths = []

  for root, _, files in shell.walk(path):
    for filename in files:
      file_path = os.path.join(root, filename)
      if is_fuzz_target_local(file_path):
        fuzz_target_paths.append(file_path)

  return fuzz_target_paths

就是is_fuzz_target_local

1、首先得名字得满足正则VALID_TARGET_NAME 2、后缀名是ALLOWED_FUZZ_TARGET_EXTENSIONS,即无后缀,exe或者par 3、最后就是文件中得有FUZZ_TARGET_SEARCH_BYTES,也即LLVMFuzzerTestOneInput这个函数

代码语言:javascript
代码运行次数:0
运行
复制
ALLOWED_FUZZ_TARGET_EXTENSIONS = ['', '.exe', '.par']
FUZZ_TARGET_SEARCH_BYTES = b'LLVMFuzzerTestOneInput'
VALID_TARGET_NAME = re.compile(r'^[a-zA-Z0-9_-]+$')

def is_fuzz_target_local(file_path, file_handle=None):
  #TODO(hzawawy): handle syzkaller case.
  """Returns whether |file_path| is a fuzz target binary (local path)."""
  filename, file_extension = os.path.splitext(os.path.basename(file_path))
  if not VALID_TARGET_NAME.match(filename):
    # Check fuzz target has a valid name (without any special chars).
    return False

  if file_extension not in ALLOWED_FUZZ_TARGET_EXTENSIONS:
    # Ignore files with disallowed extensions (to prevent opening e.g. .zips).
    return False

  if not file_handle and not os.path.exists(file_path):
    # Ignore non-existant files for cases when we don't have a file handle.
    return False

  if filename.endswith('_fuzzer'):
    return True

  # TODO(aarya): Remove this optimization if it does not show up significant
  # savings in profiling results.
  fuzz_target_name_regex = environment.get_value('FUZZER_NAME_REGEX')
  if fuzz_target_name_regex:
    return bool(re.match(fuzz_target_name_regex, filename))

  if os.path.exists(file_path) and not stat.S_ISREG(os.stat(file_path).st_mode):
    # Don't read special files (eg: /dev/urandom).
    logs.log_warn('Tried to read from non-regular file: %s.' % file_path)
    return False

  # Use already provided file handle or open the file.
  local_file_handle = file_handle or open(file_path, 'rb')

  # TODO(metzman): Bound this call so we don't read forever if something went
  # wrong.
  result = utils.search_bytes_in_file(FUZZ_TARGET_SEARCH_BYTES,
                                      local_file_handle)

  if not file_handle:
    # If this local file handle is owned by our function, close it now.
    # Otherwise, it is caller's responsibility.
    local_file_handle.close()

  return result
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-05-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 总览
  • 获取任务
  • 执行任务
    • do_engine_fuzzing
    • 引擎类
    • do_blackbox_fuzzing
  • 如何获取要运行的target
    • setup_build 中的setup_custom_binary
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档