Coinbase API v2 是加密货币交易所 Coinbase 提供的应用程序接口,允许开发者获取市场数据、执行交易和管理账户。获取历史价格是其中一项重要功能,用于分析加密货币的价格走势。
Coinbase API v2 提供了以下主要端点来获取历史价格数据:
GET https://api.coinbase.com/v2/prices/:currency_pair/historic
其中 :currency_pair
是货币对,如 BTC-USD
。
date
(必填): 请求历史价格的日期,格式为 YYYY-MM-DDperiod
(可选): 数据粒度,可以是 year
, month
, week
, day
, hour
, 或 minute
import requests
import datetime
def get_historical_price(date, currency_pair="BTC-USD"):
url = f"https://api.coinbase.com/v2/prices/{currency_pair}/historic"
params = {
"date": date,
"period": "day" # 获取每日价格
}
response = requests.get(url, params=params)
if response.status_code == 200:
return response.json()
else:
raise Exception(f"API请求失败: {response.status_code}")
# 获取单日价格
price_data = get_historical_price("2023-01-01")
print(price_data)
def get_multiple_days_historical_prices(start_date, end_date, currency_pair="BTC-USD"):
date_format = "%Y-%m-%d"
start = datetime.datetime.strptime(start_date, date_format)
end = datetime.datetime.strptime(end_date, date_format)
delta = end - start
all_prices = []
for i in range(delta.days + 1):
current_date = (start + datetime.timedelta(days=i)).strftime(date_format)
try:
price_data = get_historical_price(current_date, currency_pair)
all_prices.append(price_data)
print(f"成功获取 {current_date} 的价格数据")
except Exception as e:
print(f"获取 {current_date} 价格失败: {str(e)}")
return all_prices
# 示例:获取2023年1月1日至2023年1月7日的价格数据
prices = get_multiple_days_historical_prices("2023-01-01", "2023-01-07")
Coinbase API 有请求频率限制。免费账户通常每分钟3-10次请求。解决方案:
import time
def get_with_rate_limit(date):
time.sleep(6) # 6秒延迟,确保不超过每分钟10次限制
return get_historical_price(date)
可能原因:
解决方案:
def get_with_retry(date, max_retries=3):
for attempt in range(max_retries):
try:
return get_historical_price(date)
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
当需要获取长时间段的数据时:
import concurrent.futures
def get_concurrent_historical_prices(dates):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_date = {executor.submit(get_with_rate_limit, date): date for date in dates}
results = []
for future in concurrent.futures.as_completed(future_to_date):
date = future_to_date[future]
try:
data = future.result()
results.append(data)
except Exception as e:
print(f"{date} generated an exception: {str(e)}")
return results
如果Coinbase API无法满足需求,可以考虑:
通过合理实现上述方法和注意事项,您可以有效地使用Coinbase API v2获取连续多日的历史价格数据,并应用于各种加密货币分析和交易场景。
没有搜到相关的文章