Запрос нескольких символов Python от интерактивных брокеров

#python #pandas #api #interactive-brokers #ib-api

#python #pandas #API #interactive-brokers #ib-api

Вопрос:

Я смог собрать скрипт из документации / примеров и форумов IB на этом сайте. Я получаю желаемый результат для одного символа, однако, если я использую список акций, я не могу найти способ передать символ тикера в выходной файл DF. Мой обходной путь заключался в создании словаря, который использует последовательность списков (см. Ниже), однако вывод из API IB каждый раз незначительно меняется, что делает символы в основном бессмысленными. Список, который я использую ниже, обычно содержит более 20 имен, но может измениться, я сократил его, чтобы упростить просмотр.

@Brian / и / или другие разработчики, если есть способ создать уникальный идентификатор / последовательность для каждого вызова символа и привязать его к возвращаемым данным, я могу затем использовать словарь для применения символа. На другом форуме вы передали строку, где n_id = n_id 1, если это может быть применено и связано с каждым конкретным вызовом, который выполняется в порядке списка, тогда это может сработать?

 from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd
import threading
import time
from datetime import timedelta
import datetime

class IBapi(EWrapper, EClient):
    def __init__(self):
        EClient.__init__(self, self)
        self.data = [] #Initialize variable to store candle

    def historicalData(self, reqId, bar):
        #print(f'Time: {bar.date} Close: {bar.close} Volume: {bar.volume}',reqId)
        self.data.append([bar.date, bar.close, bar.volume, reqId])
     
def run_loop():
    app.run()

app = IBapi()
app.connect('127.0.0.1', 7496, 123)

#Start the socket in a thread
api_thread = threading.Thread(target=run_loop, daemon=True)
api_thread.start()

time.sleep(1) #Sleep interval to allow time for connection to server

symbols = ['SPY','MSFT','GOOG','AAPL','QQQ','IWM','TSLA']

for sym in symbols:
    contract = Contract()
    contract.symbol = str(sym) 
    contract.secType = "STK"
    contract.exchange = "SMART"
    contract.currency = "USD"
    #contract.primaryExchange = "ISLAND"
    app.reqHistoricalData(1, contract, "", "1 D", "10 mins", "ADJUSTED_LAST", 1, 2, False, [])

 time.sleep(5) #sleep to allow enough time for data to be returned

df = pd.DataFrame(app.data, columns=['DateTime', 'ADJUSTED_LAST','Volume','reqId'])
df['DateTime'] = pd.to_datetime(df['DateTime'],unit='s') #,unit='s') 

df['Count'] = df.groupby('DateTime').cumcount() 1
sym_dict = {1:'SPY',2:'MSFT',3:'GOOG',4:'AAPL',5:'QQQ',6:'IWM',7:'TSLA'}

df['Ticker'] = df['Count'].map(sym_dict)

print(df)
 

#редактировать, добавляя подробности @Brian:

 from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd
import time
from datetime import timedelta
import datetime

start = datetime.datetime.utcnow()

class IBapi(EWrapper, EClient):
    def __init__(self):
        EClient.__init__(self, self)
        self.data = [] 

def error(self, reqId, errorCode, errorString):
    print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)

def historicalData(self, reqId, bar):
    self.data.append([bar.date, bar.close, bar.volume, sym_dict[reqId]])
    print("HistoricalData. ReqId:", sym_dict[reqId], "BarData.", bar)
 
# include this callback to track progress and maybe disconnectwhen all are finished
def historicalDataEnd(self, reqId: int, start: str, end: str):
    print("finished", sym_dict[reqId])

def run_loop():
    app.run()

app = IBapi()
app.connect('127.0.0.1', 7496, 123)

# you should wait for nextValidId instead of sleeping, what if it takes more than 1 second? @john: how do i do this?
time.sleep(5) @john: how do i do this? wait for nextValidId?

symbols = ['SPY','MSFT','GOOG','AAPL','QQQ','IWM','TSLA']

reqId = 1
sym_dict = {}
for sym in symbols:
    contract = Contract()
    contract.symbol = str(sym) 
    sym_dict[reqId] = sym
    contract.secType = "STK"
    contract.exchange = "SMART"
    contract.currency = "USD"
    #contract.primaryExchange = "ISLAND" # you may need this for msft
    app.reqHistoricalData(reqId, contract, "", "1 D", "10 mins", "ADJUSTED_LAST", 1, 2, False, [])
    reqId  = 1
    time.sleep(5)

df = pd.DataFrame(app.data, columns=['DateTime', 'ADJUSTED_LAST','Volume','sym'])
df['DateTime'] = pd.to_datetime(df['DateTime'],unit='s') #,unit='s') 
df = df.set_index(['sym','DateTime']).sort_index()
print(df)
app.disconnect()
 

Ответ №1:

Вам просто нужно поддерживать диктат запроса и символа.

Я не уверен, что один фрейм данных — лучший способ хранения ваших данных, но если вы это сделаете, то установите мультииндекс. Решите, сколько данных вам нужно и как вы собираетесь хранить их на диске, а затем определите структуру данных. Я предлагаю csv для скорости или sqlite для простоты. Pandas может обрабатывать либо.

Я удалил ваши комментарии и добавил несколько своих.

 from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd
import threading
import time
from datetime import timedelta
import datetime

# I added this code to get fake data, works wtihout tws running
from ibapi.common import BarData
from random import random
start = datetime.datetime.utcnow()
def fake_data(reqId, ib):
    last = reqId*10
    for i in range(60, 0, -10):
        bar = BarData();
        bar.date = start - timedelta(minutes=i)
        last  = random() - 0.5
        bar.close = last
        bar.volume = reqId * 1000
        ib.historicalData(reqId, bar)
    ib.historicalDataEnd(reqId,"","")
    
class IBapi(EWrapper, EClient):
    def __init__(self):
        EClient.__init__(self, self)
        self.data = [] 

    #always include this for important messages, also turn on api logging in TWS/IBG    
    def error(self, reqId, errorCode, errorString):
        print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)

    def historicalData(self, reqId, bar):
        self.data.append([bar.date, bar.close, bar.volume, sym_dict[reqId]])
     
    # include this callback to track progress and maybe disconnectwhen all are finished
    def historicalDataEnd(self, reqId: int, start: str, end: str):
        print("finished", sym_dict[reqId])
        
def run_loop():
    app.run()

app = IBapi()
app.connect('127.0.0.1', 7496, 123)

# threading is needed only if you plan to interact after run is called
# this is a good way if you use a ui like jupyter
api_thread = threading.Thread(target=run_loop, daemon=True)
api_thread.start()

# you should wait for nextValidId instead of sleeping, what if it takes more than 1 second?
time.sleep(1)

symbols = ['SPY','MSFT','GOOG','AAPL','QQQ','IWM','TSLA']

reqId = 1
sym_dict = {}
for sym in symbols:
    contract = Contract()
    contract.symbol = str(sym) 
    sym_dict[reqId] = sym
    contract.secType = "STK"
    contract.exchange = "SMART"
    contract.currency = "USD"
    #contract.primaryExchange = "ISLAND" # you may need this for msft
    #app.reqHistoricalData(reqId, contract, "", "1 D", "10 mins", "ADJUSTED_LAST", 1, 2, False, [])
    fake_data(reqId, app)
    reqId  = 1
    #now you need to sleep(10) to make sure you don't get a pacing error for too many requests
    
# don't sleep, use historicalDataEnd to know when finished
time.sleep(5)

df = pd.DataFrame(app.data, columns=['DateTime', 'ADJUSTED_LAST','Volume','sym'])
df['DateTime'] = pd.to_datetime(df['DateTime'],unit='s')

#make an index and sort
df = df.set_index(['sym','DateTime']).sort_index()
# now you can use the indexes
print(df.loc[("SPY","2021")])

#don't forget to disconnect somewhere or the clientId will still be in use
 

Комментарии:

1. большое спасибо за рецензирование! Это невероятно полезно, однако, я думаю, что я что-то упускаю при применении вашего ответа к фактическим историческим данным. Случайные данные для тестирования, которые вы применили, очень классные, и я обязательно использую их для других начинаний. Я собираюсь добавить то, что я пытался применить, поскольку мне не хватает чего-то простого в переводе

2. Возможно, я допустил ошибку. У меня просто какое-то время нет TWS в моем местоположении. Просто используйте свой запрос исторических данных вместо поддельных данных.

3. Потрясающе! Не беспокойтесь — я очень ценю помощь — я в конечном итоге заставлю ее работать! Я занимаюсь торговлей, а не разработкой! Я все о том, чтобы найти 2k способов, которыми лампочка не работает, ха-ха. Согласно вашему предыдущему запросу — я намерен применить выходные данные с помощью некоторого анализа pandas, а затем отправить их мне в виде сигналов либо по электронной почте, либо в уведомлении slack — еще раз спасибо за вашу помощь!!