PySpark on HPC系列记录了我独自探索在HPC利用PySpark处理大数据业务数据的过程,由于这方面资料少或者搜索能力不足,没有找到需求匹配的框架,不得不手搓一个工具链,容我虚荣点,叫“框架”。框架的实现功能如下:
如上图所示,另外有几个注意点:
具体任务处理脚本有几点注意事项:
...
os.environ["PYSPARK_PYTHON"] = "<env_path>/python"
os.environ["SPARK_HOME"] = "<env_path>/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-10.9-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")
sys.path.insert(0, '<work_path>')
...
def process_raw(spark, in_file, file_output, out_csv_path):
raw_to_csv(spark, in_file, out_csv_path)
csv_to_zip(out_csv_path, file_output)
shutil.rmtree(out_csv_path)
def process_job_file(in_file,spark):
df = pd.read_csv(in_file)
for index, row in df.iterrows():
in_file, out_file, tmp_path = row['in_file'],row['out_file'],row['tmp_path']
process_raw(spark, in_file, out_file, tmp_path)
def get_parser():
parser = argparse.ArgumentParser(description='...')
parser.add_argument("-j", help="job type", dest="job_type",default='process_raw')
# process_raw:
parser.add_argument("-i", help="input job file", dest="input_file")
# generate job file
parser.add_argument("-b", help="one job batch size", dest="batch_size",default=8)
parser.add_argument("-g", help="generate job file root", dest="gen_file_root", default='./jobs')
parser.add_argument("-r", help="raw data root", dest="raw_data_root")
parser.add_argument("-t", help="target data root", dest="tar_data_root")
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
if args.job_type == 'process_raw' and args.input_file is not None:
spark = get_spark()
process_job_file(args.input_file,spark)
elif args.job_type == 'gen_job_file':
generate_jobfile_from_folder(args.raw_data_root, args.tar_data_root, batch_size=args.batch_size, job_file_folder=args.gen_file_root)
else:
parser.print_help()
#!/usr/bin/env bash
#SBATCH --job-name=<job_file>
#SBATCH --time=1:00:00
#SBATCH --nodes=1
#SBATCH --cpus-per-task=8
#SBATCH --mem=12G
#SBATCH --output=<job_name>-%j.out
#SBATCH --error=<job_name>-%j.err
<path_to_env>/python <process_file_path>.py -i $1
调用方法
sbatch spark-hpc-batch.sh <job_file_path>
#!/usr/bin/env bash
#SBATCH --job-name=<job_file>
#SBATCH --time=1:00:00
#SBATCH --nodes=1
#SBATCH --cpus-per-task=8
#SBATCH --mem=12G
#SBATCH --output=<job_name>-%j.out
#SBATCH --error=<job_name>-%j.err
JOB_FILE_ROOT=$1
<path_to_env>/python <process_file_path>.py -i $1 "$JOB_FILE_ROOT/$SLURM_ARRAY_TASK_ID.csv"
调用方法
sbatch --array=0-29 spark-hpc-batch-array.sh <job_file_root>
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。