原文:https://medium.com/modern-nlp/10-great-ml-practices-for-python-developers-b089eefc18fc
作者:Pratik Bhavsar
有时候作为一名数据科学家,我们可能会忘记自己的主要职责。我们首先是开发者,接着是研究者,最后还可能是数学家。因此,我们最重要的责任就是快速开发出一个没有错误的解决方案。
只因为我们能创建模型,并不意味着我们就是神,这不会带给我们可以写出糟糕的代码的自由。
从我开始进入这个领域,我曾经犯下很多错误,并考虑分享一些我所知道的对于机器学习工程开发中最常用的技能。在我看来,这些也是工业界目前最缺乏的一些技能。
如果必须在一个优秀的数据科学家和一个优秀的机器学习工程师中选择招聘一个,我会选择后者。
下面就开始介绍 10 个实用的机器学习建议吧。
当你开始写抽象类的时候,你就知道它可以让你的代码库变得很清晰明了,它们会强制采用一样的方法和方法名字。如果同个项目有多个人参与,并且每个人都用不同方法,那会产生不必要的混乱情况。
下面是一个代码例子:
import os
from abc import ABCMeta, abstractmethod
class DataProcessor(metaclass=ABCMeta):
"""Base processor to be used for all preparation."""
def __init__(self, input_directory, output_directory):
self.input_directory = input_directory
self.output_directory = output_directory
@abstractmethod
def read(self):
"""Read raw data."""
@abstractmethod
def process(self):
"""Processes raw data. This step should create the raw dataframe with all the required features. Shouldn't implement statistical or text cleaning."""
@abstractmethod
def save(self):
"""Saves processed data."""
class Trainer(metaclass=ABCMeta):
"""Base trainer to be used for all models."""
def __init__(self, directory):
self.directory = directory
self.model_directory = os.path.join(directory, 'models')
@abstractmethod
def preprocess(self):
"""This takes the preprocessed data and returns clean data. This is more about statistical or text cleaning."""
@abstractmethod
def set_model(self):
"""Define model here."""
@abstractmethod
def fit_model(self):
"""This takes the vectorised data and returns a trained model."""
@abstractmethod
def generate_metrics(self):
"""Generates metric with trained model and test data."""
@abstractmethod
def save_model(self, model_name):
"""This method saves the model in our required format."""
class Predict(metaclass=ABCMeta):
"""Base predictor to be used for all models."""
def __init__(self, directory):
self.directory = directory
self.model_directory = os.path.join(directory, 'models')
@abstractmethod
def load_model(self):
"""Load model here."""
@abstractmethod
def preprocess(self):
"""This takes the raw data and returns clean data for prediction."""
@abstractmethod
def predict(self):
"""This is used for prediction."""
class BaseDB(metaclass=ABCMeta):
""" Base database class to be used for all DB connectors."""
@abstractmethod
def get_connection(self):
"""This creates a new DB connection."""
@abstractmethod
def close_connection(self):
"""This closes the DB connection."""
实验的可复现是非常重要的一件事情,而随机种子可能会造成实验结果无法复现。因此必须固定好随机种子,否则会导致不同的训练集和测试集,以及神经网络的不同初始化权重,这些都会导致不一样的实验结果。
def set_seed(args):
random.seed(args.seed)
np.random.seed(args.seed)
torch.manual_seed(args.seed)
if args.n_gpu > 0:
torch.cuda.manual_seed_all(args.seed)
如果你的数据量非常大,然后你接下来要做的工作是类似于清洗数据或者建模,那么可以每次采用少量的数据来避免一次加载大量的数据。当然这个做法的前提是你只是希望测试代码,而不是实际开始实现相应的工作。
这个做法非常实用,特别是你本地电脑的配置不足以加载全部数据集的时候,但你又想在本地电脑采用 Jupyter/ VS code/ Atom 做实验。
代码例子如下:
df_train = pd.read_csv(‘train.csv’, nrows=1000)
记得每次都要检查数据是否存在空数据(NA),因为这将带来代码出错。当然,即便当前数据中不存在,这并不意味着在后续的训练步骤中不会出现这种情况,所以需要保持这种检查。
比如采用下述的代码:
print(len(df))
df.isna().sum()
df.dropna()
print(len(df))
当在处理大量的数据的时候,如果能够知道总共需要的时间以及当前的处理进度是非常有帮助的。
这里有几种方法:
第一种方法:采用 tqdm
库,代码例子:
from tqdm import tqdm
import time
tqdm.pandas()
df['col'] = df['col'].progress_apply(lambda x: x**2)
text = ""
for char in tqdm(["a", "b", "c", "d"]):
time.sleep(0.25)
text = text + char
第二种方法--fastprogress
from fastprogress.fastprogress import master_bar, progress_bar
from time import sleep
mb = master_bar(range(10))
for i in mb:
for j in progress_bar(range(100), parent=mb):
sleep(0.01)
mb.child.comment = f'second bar stat'
mb.first_bar.comment = f'first bar stat'
mb.write(f'Finished loop {i}.')
效果如下图所示:
如果你采用 pandas
库,那么你会发现有的时候它的速度会有多慢,特别是采用 groupby
函数的时候。不需要想尽办法来寻找更好的加速方法,只需要修改一行代码即可,如下所示,采用 modin
即可解决这个问题:
import modin.pandas as pd
不是所有的函数运行时间都是一样的。
即便你的代码都跑通了,但也不表示你写出一手好代码。有些软性错误(soft-bugs)可能会导致你的代码运行算的变慢,因此很有必要找到这些问题。可以采用下述装饰器来打印函数的运行时间。
import time
from functools import wraps
def timing(f):
"""Decorator for timing functions
Usage:
@timing
def function(a):
pass
"""
@wraps(f)
def wrapper(*args, **kwargs):
start = time.time()
result = f(*args, **kwargs)
end = time.time()
print('function:%r took: %2.4f sec' % (f.__name__, end - start))
return result
return wrapper
使用的例子如下所示:
from time import sleep
@timing
def count_nums():
sleep(1)
结果如下所示:
没有人会喜欢一个浪费云资源的工程师
有些实验是需要跑上数十个小时的,这些实验很难进行跟踪并在实验完成的时候关闭云服务器的实例。我曾经犯过这样的错误,同时也看到其他人也曾经因为这个忘记关闭好几天。
这种情况通常发现在周五的时候,然后让其一直运行到周一。
为了避免发生这种情况,可以在代码的最后加入下列代码。
不过,注意要主要代码放在 try catch
中进行捕获异常,以防止发生错误。这种情况也是很可能发生的。
import os
def run_command(cmd):
return os.system(cmd)
def shutdown(seconds=0, os='linux'):
"""Shutdown system after seconds given. Useful for shutting EC2 to save costs."""
if os == 'linux':
run_command('sudo shutdown -h -t sec %s' % seconds)
elif os == 'windows':
run_command('shutdown -s -t %s' % seconds)
在训练模型后,所有的想法最终都是来自错误和评判指标的分析。因此需要创建并保存好一个格式不错的报告,以便进行汇报。
下面是一个代码例子:
import json
import os
from sklearn.metrics import (accuracy_score, classification_report,
confusion_matrix, f1_score, fbeta_score)
def get_metrics(y, y_pred, beta=2, average_method='macro', y_encoder=None):
if y_encoder:
y = y_encoder.inverse_transform(y)
y_pred = y_encoder.inverse_transform(y_pred)
return {
'accuracy': round(accuracy_score(y, y_pred), 4),
'f1_score_macro': round(f1_score(y, y_pred, average=average_method), 4),
'fbeta_score_macro': round(fbeta_score(y, y_pred, beta, average=average_method), 4),
'report': classification_report(y, y_pred, output_dict=True),
'report_csv': classification_report(y, y_pred, output_dict=False).replace('\n','\r\n')
}
def save_metrics(metrics: dict, model_directory, file_name):
path = os.path.join(model_directory, file_name + '_report.txt')
classification_report_to_csv(metrics['report_csv'], path)
metrics.pop('report_csv')
path = os.path.join(model_directory, file_name + '_metrics.json')
json.dump(metrics, open(path, 'w'), indent=4)
你可以很好完成数据清理和训练模型,但是也还可能在最后制造很大的错误,比如没有写好服务接口。我的经验告诉我,很多人其实不知道如果写出一个好的服务接口,文档说明和服务安装配置。后面我会写另一篇文章介绍,但现在先简单介绍一下。
下面是一个很好的用于经典的机器学习和深度学习的部署策略,但注意是请求数量不大的时候,比如每分钟1000次。 一个组合:Fastapi + uvicorn + gunicorn
fastapi
写接口是非常快速的,正如[1]中的报告展示,以及原因可以参考[2];fastapi
有免费的官方文档以及可以通过 http:url/docs
进行测试,并且这个链接可以自动生成以及随着我们修改代码而自动改变;gunicorn
服务器部署接口是因为它具有开始多于 1 个 worker
的功能,并且你应该至少保持 2 个。运行下列命令可以部署使用 4 个 wokers
,另外可以通过测试来优化这个 workers
的数量。
pip install fastapi uvicorn gunicorn
gunicorn -w 4 -k uvicorn.workers.UvicornH11Worker main:app
一个运行例子如下图所示: