#pandas #multithreading #multiprocessing
Вопрос:
У меня ниже приведен код, который считывает из csv-файла несколько символов тикера в фрейм данных.
Каждый тикер вызывает веб-Api, возвращающий df-кадр daf, который затем присоединяется к последнему до завершения.
Код работает , но при использовании большого количества тикеров код сильно замедляется.
Я понимаю, что могу использовать многопроцессорную обработку и потоки для ускорения своего кода, но не знаю, с чего начать и что было бы наиболее подходящим в моем конкретном случае.
Какой код я должен использовать, чтобы как можно быстрее поместить мои данные в комбинированный фрейм?
import pandas as pd
import numpy as np
import json
tickers=pd.read_csv("D:/verhuizen/pensioen/MULTI.csv",names=['symbol','company'])
read_str='https://financialmodelingprep.com/api/v3/income-statement/AAPL?limit=120amp;apikey=demo'
df = pd.read_json (read_str)
df = pd.DataFrame(columns=df.columns)
for ind in range(len(tickers)):
read_str='https://financialmodelingprep.com/api/v3/income-statement/' tickers['symbol'][ind] '?limit=120amp;apikey=demo'
df1 = pd.read_json (read_str)
df=pd.concat([df,df1], ignore_index=True)
df.set_index(['date','symbol'], inplace=True)
df.sort_index(inplace=True)
df.to_csv('D:/verhuizen/pensioen/MULTI_out.csv')
Приведенный код отлично работает для небольших наборов данных, но когда я использую большое количество тикеров (>4000), в какой-то момент я получаю следующую ошибку. Это потому, что веб-api перегружен или есть другая проблема?
Traceback (most recent call last):
File "D:/Verhuizen/Pensioen/Equity_Extractor_2021.py", line 43, in <module>
data = pool.starmap(download_data, enumerate(TICKERS, start=1))
File "C:UsersMLUYAppDataLocalProgramsPythonPython37-32libmultiprocessingpool.py", line 276, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "C:UsersMLUYAppDataLocalProgramsPythonPython37-32libmultiprocessingpool.py", line 657, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x00C33E30>'. Reason: 'TypeError("cannot serialize '_io.BufferedReader' object")'
Process finished with exit code 1
Он продолжает выдавать ту же ошибку (для большего количества тикеров).
код в точности такой, как указано:
def download_data(pool_id, symbols):
df = []
for symbol in symbols:
print("[{:02}]: {}".format(pool_id, symbol))
#do stuff here
read_str = BASEURL.format(symbol)
df.append(pd.read_json(read_str))
#df.append(pd.read_json(fake_data(symbol)))
return pd.concat(df, ignore_index=True)
It failed again with the pool.map, but one strange thing I noticed. Each time it fails it does so around 12,500 tickers (total is around 23,000 tickers) Similar error:
Traceback (most recent call last):
File "C:/Users/MLUY/AppData/Roaming/JetBrains/PyCharmCE2020.1/scratches/Equity_naive.py", line 21, in <module>
data = pool.map(download_data, TICKERS)
File "C:UsersMLUYAppDataLocalProgramsPythonPython37-32libmultiprocessingpool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "C:UsersMLUYAppDataLocalProgramsPythonPython37-32libmultiprocessingpool.py", line 657, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x078D1BF0>'. Reason: 'TypeError("cannot serialize '_io.BufferedReader' object")'
Process finished with exit code 1
I get the tickers also from a API call https://financialmodelingprep.com/api/v3/financial-statement-symbol-lists?apikey=demo (I noticed it does not work without subscription), I wanted to attach the data it as a csv file but I dont have sufficient rights. I dont think its a good idea to paste the returned data here…
I tried adding time.sleep(0.2) before return as suggested, but again I ge the same error at ticker 12,510. Strange everytime its around the same location. As there are multiple processes going on I cannot see at what point its breaking
Traceback (most recent call last):
File "C:/Users/MLUY/AppData/Roaming/JetBrains/PyCharmCE2020.1/scratches/Equity_naive.py", line 24, in <module>
data = pool.map(download_data, TICKERS)
File "C:UsersMLUYAppDataLocalProgramsPythonPython37-32libmultiprocessingpool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "C:UsersMLUYAppDataLocalProgramsPythonPython37-32libmultiprocessingpool.py", line 657, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x00F32C90>'. Reason: 'TypeError("cannot serialize '_io.BufferedReader' object")'
Process finished with exit code 1
Происходит что-то очень , очень странное, я разделил данные на куски 10,000 / 5,000 / 4,000 и 2000, и каждый раз, когда код прерывает примерно 100 тикеров с конца. Очевидно, что происходит что-то не то
import time
import pandas as pd
import multiprocessing
# get tickers from your csv
df=pd.read_csv('D:/Verhuizen/Pensioen/All_Symbols.csv',header=None)
# setting the Dataframe to a list (in total 23,000 tickers)
df=df[0]
TICKERS=df.tolist()
#Select how many tickers I want
TICKERS=TICKERS[0:2000]
BASEURL = "https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120amp;apikey=demo"
def download_data(symbol):
print(symbol)
# do stuff here
read_str = BASEURL.format(symbol)
df = pd.read_json(read_str)
#time.sleep(0.2)
return df
if __name__ == "__main__":
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
data = pool.map(download_data, TICKERS)
df = pd.concat(data).set_index(["date", "symbol"]).sort_index()
df.to_csv('D:/verhuizen/pensioen/Income_2000.csv')
В этом конкретном примере код прерывается в позиции 1903
RPAI
Traceback (most recent call last):
File "C:/Users/MLUY/AppData/Roaming/JetBrains/PyCharmCE2020.1/scratches/Equity_testing.py", line 27, in <module>
data = pool.map(download_data, TICKERS)
File "C:UsersMLUYAppDataLocalProgramsPythonPython37-32libmultiprocessingpool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "C:UsersMLUYAppDataLocalProgramsPythonPython37-32libmultiprocessingpool.py", line 657, in get
raise self._value
multiprocessing.pool.MaybeEncodingError: Error sending result: '<multiprocessing.pool.ExceptionWithTraceback object at 0x0793EAF0>'. Reason: 'TypeError("cannot serialize '_io.BufferedReader' object")'
Комментарии:
1. Не могли бы вы скопировать/вставить содержимое вашей
download_data
функции, пожалуйста?2. Он продолжает выдавать ту же ошибку, код точно такой, как указано: def download_data(pool_id, символы): df = [] для символа в символах: печать(«[{:02}]: {}».формат(идентификатор пула, символ)) #сделайте что-нибудь здесь read_str = БАЗОВЫЙ формат(символ) df.добавьте(pd.read_json(read_str)) #df.добавьте(pd.read_json(fake_data(символ))) верните pd.concat(df, ignore_index=True)
3. Используйте вторую более простую версию с
pool.map
вместо первой сpool.starmap
. Мы доберемся туда! Я думаю, что у вас есть ограничение скорости от вашего api.4. Это снова не удалось с картой pool.map, но я заметил одну странную вещь. Каждый раз, когда он терпит неудачу, он делает это примерно с 12 500 тикерами (всего около 23 000 тикеров). Я получаю тикеры из вызова API , я прикреплю их в виде csv-файла
5. Вызовы API ограничены в зависимости от подписки, за исключением корпоративного плана. Если каждый запрос занимает 100 мс, вы можете совершать 600 звонков в минуту. Тем не менее, вы не должны превышать 300 звонков для стартового плана, но он подходит для профессионального ( Вы можете ввести задержку с
time.sleep(0.2)
«доreturn
«.
Ответ №1:
Первая оптимизация заключается в том, чтобы избежать объединения фрейма данных на каждой итерации.
Вы можете попробовать что-то в этом роде:
url = "https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120amp;apikey=demo"
df = []
for symbol in tickers["symbol"]:
read_str = url.format(symbol)
df.append(pd.read_json(read_str))
df = pd.concat(df, ignore_index=True)
Если этого недостаточно, мы посмотрим , как использовать async
, threading
или multiprocessing
.
Изменить:
Приведенный ниже код может выполнить эту работу:
import pandas as pd
import numpy as np
import multiprocessing
import time
import random
PROCESSES = 4 # number of parallel process
CHUNKS = 6 # one process handle n symbols
# get tickers from your csv
TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
"XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
"TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
"AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
"MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
"CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]
# create a list of n sublist
TICKERS = [TICKERS[i:i CHUNKS] for i in range(0, len(TICKERS), CHUNKS)]
BASEURL = "https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120amp;apikey=demo"
def fake_data(symbol):
dti = pd.date_range("1985", "2020", freq="Y")
df = pd.DataFrame({"date": dti, "symbol": symbol,
"A": np.random.randint(0, 100, size=len(dti)),
"B": np.random.randint(0, 100, size=len(dti))})
time.sleep(random.random()) # to simulate network delay
return df.to_json()
def download_data(pool_id, symbols):
df = []
for symbol in symbols:
print("[{:02}]: {}".format(pool_id, symbol))
# do stuff here
# read_str = BASEURL.format(symbol)
# df.append(pd.read_json(read_str))
df.append(pd.read_json(fake_data(symbol)))
return pd.concat(df, ignore_index=True)
if __name__ == "__main__":
with multiprocessing.Pool(PROCESSES) as pool:
data = pool.starmap(download_data, enumerate(TICKERS, start=1))
df = pd.concat(data).set_index(["date", "symbol"]).sort_index()
В этом примере я разделил список тикеров на подсписки для каждого процесса, который извлекает данные для нескольких символов и ограничивает накладные расходы, связанные с созданием и уничтожением процессов.
Задержка предназначена для имитации времени отклика от сетевого подключения и выделения многопроцессорного поведения.
Правка 2: более простая, но наивная версия для ваших нужд
import pandas as pd
import multiprocessing
# get tickers from your csv
TICKERS = ["BCDA", "WBAI", "NM", "ZKIN", "TNXP", "FLY", "MYSZ", "GASX", "SAVA", "GCE",
"XNET", "SRAX", "SINO", "LPCN", "XYF", "SNSS", "DRAD", "WLFC", "OILD", "JFIN",
"TAOP", "PIC", "DIVC", "MKGI", "CCNC", "AEI", "ZCMD", "YVR", "OCG", "IMTE",
"AZRX", "LIZI", "ORSN", "ASPU", "SHLL", "INOD", "NEXI", "INR", "SLN", "RHE-PA",
"MAX", "ARRY", "BDGE", "TOTA", "PFMT", "AMRH", "IDN", "OIS", "RMG", "IMV",
"CHFS", "SUMR", "NRG", "ULBR", "SJI", "HOML", "AMJL", "RUBY", "KBLMU", "ELP"]
BASEURL = "https://financialmodelingprep.com/api/v3/income-statement/{}?limit=120amp;apikey=demo"
def download_data(symbol):
print(symbol)
# do stuff here
read_str = BASEURL.format(symbol)
df = pd.read_json(read_str)
return df
if __name__ == "__main__":
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
data = pool.map(download_data, TICKERS)
df = pd.concat(data).set_index(["date", "symbol"]).sort_index()
Примечание о pool.map
: для каждого символа в TICKERS
создайте процесс и вызовите функцию download_data
.
Комментарии:
1. Работает (и я часто так работаю), но я бы не стал называть список
df
, ноldf
(для списка кадров данных), чтобы избежать путаницы.2. Вы используете 2 переменные
ldf
иdf
поэтому потребляете в два раза больше памяти.df
поскольку список используется только в цикле, тоdf
в качестве фрейма данных его можно использовать в любом месте кода позже, поэтому я предпочитаю перезаписывать временный список конечным фреймом данных.3. Спасибо, есть некоторое увеличение скорости на 15%, но я надеялся на множество, Как бы я использовал многопроцессорную обработку и какого увеличения скорости я мог там ожидать?
4. У меня около 10 000 символов, и каждый символ имеет три отдельных веб — вызова, чтобы получить json с тремя финансовыми отчетами. Но, глядя на ваш код, ух ты, я думаю, что я немного не в себе…. Я (думаю) Я понимаю, почему вы имитируете время отклика, но я действительно не знаю, как это реализовать сейчас. Я вижу, вы прокомментировали код, который вызывает мои реальные данные. Но как мне на самом деле вызвать функцию download_data?? Я полагаю, что pool_id-это один из 4 процессов? Если бы я мог заставить это работать, это был бы черный ящик, который мне действительно нужно изучить!
5. Вау, удивительно быстро!, все работает! , большое спасибо. Сначала я попытался запустить в ноутбуке jupyter, но почему-то это не сработало, но когда я запустил его в Pycharm IDE, он работал нормально!. Я не понимаю большую часть кода, но я получаю результаты. Я попытался изменить процессы (установлено на 4) и фрагменты (установлено на 6), но для каждого изменения (вверх) это приводит к ошибке. Есть ли максимум для этих размеров, и все еще есть способ оптимизировать скорость, регулируя их, или это сейчас максимальная скорость?