将CSV文件导入Django模型是将结构化数据从逗号分隔值(CSV)文件加载到Django数据库模型中的过程。这是数据迁移、批量导入或系统初始化的常见需求。
import csv
from django.core.management.base import BaseCommand
from yourapp.models import YourModel
class Command(BaseCommand):
help = 'Import CSV file into Django model'
def add_arguments(self, parser):
parser.add_argument('csv_file', type=str, help='Path to the CSV file')
def handle(self, *args, **kwargs):
csv_file = kwargs['csv_file']
with open(csv_file, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
# 假设CSV列名与模型字段名匹配
YourModel.objects.create(**row)
self.stdout.write(self.style.SUCCESS('Successfully imported data'))
import pandas as pd
from django.core.management.base import BaseCommand
from yourapp.models import YourModel
class Command(BaseCommand):
help = 'Import CSV using pandas'
def add_arguments(self, parser):
parser.add_argument('csv_file', type=str)
def handle(self, *args, **kwargs):
df = pd.read_csv(kwargs['csv_file'])
for index, row in df.iterrows():
YourModel.objects.create(**row.to_dict())
self.stdout.write(self.style.SUCCESS('Data imported successfully'))
import csv
from django.core.management.base import BaseCommand
from yourapp.models import YourModel
class Command(BaseCommand):
help = 'Bulk import CSV data'
def add_arguments(self, parser):
parser.add_argument('csv_file', type=str)
def handle(self, *args, **kwargs):
csv_file = kwargs['csv_file']
objects = []
with open(csv_file, 'r') as file:
reader = csv.DictReader(file)
for row in reader:
objects.append(YourModel(**row))
YourModel.objects.bulk_create(objects)
self.stdout.write(self.style.SUCCESS(f'Imported {len(objects)} records'))
解决方案:在导入前映射字段
field_mapping = {
'csv_column1': 'model_field1',
'csv_column2': 'model_field2'
}
data = {field_mapping[key]: value for key, value in row.items()}
YourModel.objects.create(**data)
解决方案:在导入前转换数据类型
from datetime import datetime
# 假设CSV中有日期字段
row['date_field'] = datetime.strptime(row['date_field'], '%Y-%m-%d')
解决方案:先获取或创建相关对象
related_obj, created = RelatedModel.objects.get_or_create(name=row['related_name'])
YourModel.objects.create(
name=row['name'],
related_field=related_obj
)
解决方案:分批导入
batch_size = 1000
objects = []
with open(csv_file, 'r') as file:
reader = csv.DictReader(file)
for i, row in enumerate(reader):
objects.append(YourModel(**row))
if i % batch_size == 0:
YourModel.objects.bulk_create(objects)
objects = []
if objects: # 导入剩余记录
YourModel.objects.bulk_create(objects)
import csv
from django.core.management.base import BaseCommand
from yourapp.models import YourModel
from django.db import transaction
class Command(BaseCommand):
help = 'Import CSV with error handling'
def add_arguments(self, parser):
parser.add_argument('csv_file', type=str)
def handle(self, *args, **kwargs):
csv_file = kwargs['csv_file']
success_count = 0
error_count = 0
errors = []
with open(csv_file, 'r') as file:
reader = csv.DictReader(file)
with transaction.atomic():
for row_num, row in enumerate(reader, 1):
try:
# 数据预处理
if 'date_field' in row:
row['date_field'] = datetime.strptime(row['date_field'], '%Y-%m-%d')
# 创建记录
YourModel.objects.create(**row)
success_count += 1
except Exception as e:
error_count += 1
errors.append(f"Row {row_num}: {str(e)}")
# 输出结果
self.stdout.write(self.style.SUCCESS(f'Successfully imported {success_count} records'))
if error_count > 0:
self.stdout.write(self.style.ERROR(f'Failed to import {error_count} records'))
for error in errors:
self.stdout.write(self.style.ERROR(error))
通过以上方法和注意事项,您可以高效、可靠地将CSV数据导入Django模型。
没有搜到相关的文章