Récupérez tout l'historique d'une crypto rapidement sur n'importe quel échange grace à l'asynchrone en python

Crypto Robot14 février 202318 min

Que ce soit sur Binance, Bybit ou n'importe quel autre échange il y a une limite quant à la quantité de bougies que vous pouvez récupérer en une seule requête. Grace à l'asynchrone avec python, dans cet article nous vous proposons une solution pour télécharger l'historique d'une ou plusieurs crypto-monnaies très rapidement.

Preview_image

Comment récupérer toutes les bougies d'une crypto rapidement ?

Un problème récurrant dans l'interaction par API avec les échangeurs crypto comme Binance ou Bybit est la limite du nombre de bougies que l'on peut récupérer par requête.

En effet si l'on prend l'exemple de bybit, nous sommes limité à 200 bougies par requête. Ce que l'on appelle bougie sont des données OHLCV (Open, High, Low, Close, Volume) qui représente l'évolution du cours d'un actif comme une crypto-monnaie.

Cela peut vite de venir problématique, imaginons que l'on ai un script de trading automatisé qui utilise une moyenne mobile 500 périodes, cela est impossible car nous n'avons pu récupérer que 200 données. Encore pire si nous souhaitons réalisé un backtest (simulation sur le passé). Dans ce cas nous serons limité à simuler sur les 200 dernières heures si nous utilisons des données heure par heure.

C'est pourquoi dans cette article nous allons vous proposer un programme python vous permettant de récupérer autant d'historique que voulu (jusqu'à minute par minute) et cela de manière très rapide.

Notez que dans cet article l'exemple est bybit. Cependant les exemples ici sont valables pour binance ou encore bitget ce dernier ne retourne que 100 bougies.

Quelle est la technique à utiliser pour récupérer plus de données sur les marchés ?

Etant donné que le "rate limit" sur l'API bybit (et sur la majorité des échanges) est relativement élevé sur la méthode GET.

Capture d’écran 2023-02-14 à 09.23.59.png

L'idée de notre code python est qu'a partir d'une date de début, et d'une date fin ainsi que d'un intervalle, il soit en mesure de construire les différentes requêtes en fonction de l'échange utilisé.

Pour que ce soit plus simple à comprendre Imaginons la chose suivante :

Nous sommes maintenant une nouvelle plateforme d'échange de crypto-monnaie. Nous fournissons à nos utilisateurs la possibilité de récupéré les informations de différentes paires au travers d'une API. Nous avons fixé une limite de requête à 100 requêtes par seconde et par utilisateur. Néanmoins nous ne retournons que les 11 dernières bougies, et uniquement journalière. Notre API a besoin d'obligatoirement 3 paramètres pour fonctionner, la paire, la date de début, et la date de fin.

Supposons désormais un utilisateur, celui-ci a besoin de récupérer 12 jours de données à partir du 1 février 2023, et pas un de plus. Malheureusement pour lui il ne peut pas faire la requête suivante : https://crypto-robot.com/xchange/api/market?symbol=BTCUSDT&date_debut=01-02-2023&date_fin=12-02-2023

il devra donc faire plusieurs requêtes à savoir :

https://crypto-robot.com/xchange/api/market?symbol=BTCUSDT&date_debut=01-02-2023&date_fin=11-02-2023

https://crypto-robot.com/xchange/api/market?symbol=BTCUSDT&date_debut=12-02-2023&date_fin=12-02-2023

Voila c'est exactement ce que va faire notre code python. Construire les requêtes afin de les enchainer le plus rapidement possible. Pour faire cela nous allons utiliser la programmation asynchrone.

C'est quoi la programmation asynchrone ?

Nous allons tenter de faire une rapide introduction à ce qu'est l'asynchrone.

Un programme qui utilise cette technique a pour but de passer le moins de temps possible à attendre, pour faire cela notre ce programme exécute plusieurs tâches en même temps. On dit qu'il s'exécute de manière concurrente.

Reprenons l'exemple de tout à l'heure avec notre échange fictif. Nous avons un total de deux requêtes pour récupérer nos données. En programmation classique (synchrone) il aurait fonctionné de la façon suivante :

  • Envoi de la première requête
  • Attente de la réponse de l'échange
  • Récupérer les données
  • Envoi de la seconde requête
  • Attente de la réponse de l'échange
  • Récupérer les données
  • Enregistrer les données dans un csv

Dans notre code en asynchrone son schéma de fonctionnement est le suivant:

  • Envoi de la première requête
  • Envoi de la seconde requête
  • Récupérer les données de la première requête
  • Récupérer les données de la seconde requête
  • Enregistrer les données dans un csv

Vous remarquerez que lorsque qu'on utilise l'asynchrone nous nous affranchissons du temps d'attente, ce qui nous permet ne pas bloquer notre programme et de faire autre chose en attendant que la plateforme nous envoi les données.

Programmer en asynchrone avec python

Voici le code python que nous vous proposons :

import asyncio
from posixpath import dirname
from pathlib import Path
import ccxt.async_support as ccxt
import pytz
import pandas as pd
import os
from datetime import datetime, timedelta
from tqdm.auto import tqdm
import itertools
import timeit
import time


class ExchangeDataManager:

    # Liste des exchanges à supporter
    CCXT_EXCHANGES = {
        "binance": {
            "ccxt_object": ccxt.binance(config={"enableRateLimit": True}),
            "limit_size_request": 1000,
        },
        "binanceusdm": {
            "ccxt_object": ccxt.binanceusdm(config={"enableRateLimit": True}),
            "limit_size_request": 1000,
        },
        "kucoin": {
            "ccxt_object": ccxt.kucoin(config={"enableRateLimit": True}),
            "limit_size_request": 1500,
        },
        "hitbtc": {
            "ccxt_object": ccxt.hitbtc(config={"enableRateLimit": True}),
            "limit_size_request": 1000,
        },
        "bitfinex": {
            "ccxt_object": ccxt.bitfinex(config={"enableRateLimit": True}),
            "limit_size_request": 10000,
        },
        "bybit": {
            "ccxt_object": ccxt.bybit(config={"enableRateLimit": True}),
            "limit_size_request": 200,
        },
        "bitget": {
            "ccxt_object": ccxt.bitget(config={"enableRateLimit": True}),
            "limit_size_request": 100,
        },
        "bitmart": {
            "ccxt_object": ccxt.bitmart(config={"enableRateLimit": True}),
            "limit_size_request": 500,     
        }
    }

    # Liste des intervalles à supporter
    INTERVALS = {
        "1m": {"timedelta": timedelta(minutes=1), "interval_ms": 60000},
        "2m": {"timedelta": timedelta(minutes=2), "interval_ms": 120000},
        "5m": {"timedelta": timedelta(minutes=5), "interval_ms": 300000},
        "15m": {"timedelta": timedelta(minutes=15), "interval_ms": 900000},
        "30m": {"timedelta": timedelta(minutes=30), "interval_ms": 1800000},
        "1h": {"timedelta": timedelta(hours=1), "interval_ms": 3600000},
        "2h": {"timedelta": timedelta(hours=2), "interval_ms": 7200000},
        "4h": {"timedelta": timedelta(hours=4), "interval_ms": 14400000},
        "12h": {"timedelta": timedelta(hours=12), "interval_ms": 43200000},
        "1d": {"timedelta": timedelta(days=1), "interval_ms": 86400000},
        "1w": {"timedelta": timedelta(weeks=1), "interval_ms": 604800000},
        "1M": {"timedelta": timedelta(days=30), "interval_ms": 2629746000},
    }

    def __init__(self, exchange_name, path_download="./") -> None:
        """This method create an ExchangeDataManager object
           Args:
               exchange_name (_type_): the exchange you need for download or load data
               path_download (str, optional): directory path (default "./")

           Raises:
               NotImplementedError: if the exchange is unsupported
        """
        self.exchange_name = exchange_name.lower()
        self.path_download = path_download
        try:
            self.exchange_dict = ExchangeDataManager.CCXT_EXCHANGES[self.exchange_name]
        except Exception:
            raise NotImplementedError(
                f"L'échange {self.exchange_name} n'est pas supporté"
            )
        self.intervals_dict = ExchangeDataManager.INTERVALS

        self.exchange = self.exchange_dict["ccxt_object"]

        self.path_data = str(
            Path(
                os.path.join(dirname(__file__), self.path_download, self.exchange_name)
            ).resolve()
        )
        os.makedirs(self.path_data, exist_ok=True)
        self.pbar = None

    def load_data(
        self, coin, interval, start_date="1990", end_date="2050"
    ) -> pd.DataFrame:
        """This method load the market data between 2 dates

            :param coin: symbol (ex: BTCUSDT)
            :param interval: interval between each point of data (ex: 1h)
            :param start_date: starting date (default 1990)
            :param end_date: end date (default 2050)
            :return pd.DataFrame
        """
        file_path = f"{self.path_data}/{interval}/"
        file_name = f"{file_path}{coin.replace('/', '-')}.csv"
        if not os.path.exists(file_name):
            raise FileNotFoundError(f"Le fichier {file_name} n'existe pas")

        df = pd.read_csv(file_name, index_col=0, parse_dates=True)
        df.index = pd.to_datetime(df.index, unit="ms")
        df = df.groupby(df.index).first()
        df = df.loc[start_date:end_date]
        df = df.iloc[:-1]

        return df

    async def download_data(
        self,
        coins,
        intervals,
        start_date="2017-01-01 00:00:00",
        end_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    ) -> None:
        """This method download the market data between 2 dates

            :param coins: list of symbols (ex: [BTCUSDT])
            :param intervals: list of intervals between each point of data (ex: [1h, 1w])
            :param start_date: starting date (ex:  2020-01-01 00:00:00) (default: "2017-01-01 00:00:00")
            :param end_date: end date (ex: 2023-01-01 01:00:00) (default: current timestamp)
            :return None
        """

        await self.exchange.load_markets()

        start_date = datetime.strptime(start_date, "%Y-%m-%d %H:%M:%S")
        end_date = datetime.strptime(end_date, "%Y-%m-%d %H:%M:%S")
   

        for interval in intervals:

            all_dt_intervals = list(
                self.create_intervals(
                    start_date, end_date, self.create_timedelta(interval)
                )
            )
   
            last_dt = all_dt_intervals[-1].astimezone(pytz.utc)
            end_timestamp = int(last_dt.timestamp() * 1000)
          
            for coin in coins:
                print(
                    f"\tRécupération pour la paire {coin} en timeframe {interval} sur l'exchange {self.exchange_name}..."
                )

                file_path = f"{self.path_data}/{interval}/"
                os.makedirs(file_path, exist_ok=True)
                file_name = f"{file_path}{coin.replace('/', '-')}.csv"

                dt_or_false = await self.is_data_missing(file_name, last_dt, str(start_date))
                
                if dt_or_false:

                    tasks = []
                    current_timestamp = int(dt_or_false.timestamp() * 1000)
                    turn = 0
                 
                    while current_timestamp <= end_timestamp:
                        tasks.append(
                            asyncio.create_task(
                                self.download_tf(coin, interval, current_timestamp)
                            )
                        )
                        current_timestamp = (
                            self.exchange_dict["limit_size_request"]
                            * self.intervals_dict[interval]["interval_ms"]
                            + current_timestamp
                        )

                    self.pbar = tqdm(tasks)
                    results = await asyncio.gather(*tasks)
                    await self.exchange.close()
                  
                    results = list(itertools.chain(*results))
               
                    self.pbar.close()

                    if results:
                        final = pd.DataFrame(
                            results,
                            columns=["date", "open", "high", "low", "close", "volume"],
                        )
                        final.set_index('date', drop=False, inplace=True)
                        final = final[~final.index.duplicated(keep='first')]
                        
                        
                        flag_header = (
                            ("a", False) if os.path.exists(file_name) else ("w", True)
                        )
                        with open(file_name, mode=flag_header[0]) as f:
                            final.to_csv(
                                path_or_buf=f, header=flag_header[1], index=False
                            )
                    else:
                        print(f"\tPas de données pour {coin} en {interval} sur cette période")
                else:
                    print("\tDonnées déjà récupérées")

                #print("\033[H\033[J", end="")

    async def download_tf(self, coin, interval, start_timestamp) -> list:
        tests = 0
        while True:
            try:
                r = await self.exchange.fetch_ohlcv(
                    symbol=coin,
                    timeframe=interval,
                    since=start_timestamp,
                    limit=self.exchange_dict["limit_size_request"],
                )
                self.pbar.update(1)
                return r
            except Exception:
                tests += 1
                if tests == 3:
                    raise TooManyError

    async def is_data_missing(self, file_name, last_dt, start_date) -> bool | datetime:

        await self.exchange.close()

        if os.path.isfile(file_name):
            df = pd.read_csv(file_name, index_col=0, parse_dates=True)
            df.index = pd.to_datetime(df.index, unit="ms")
            df = df.groupby(df.index).first()

            if pytz.utc.localize(df.index[-1]) >= last_dt:
                return False
        else:
            return datetime.fromisoformat(start_date)
        return pytz.utc.localize(df.index[-2])

    def create_intervals(self, start_date, end_date, delta):
        current = start_date
        while current <= end_date:
            yield current
            current += delta

    def create_timedelta(self, interval) -> int:
        try:
            return self.intervals_dict[interval]["timedelta"]
        except Exception:
            raise ValueError(f"Intervalle {interval} inconnu")

    def explore_data(self) -> pd.DataFrame:
        files_data = []
        for path, subdirs, files in os.walk(self.path_download):
            for name in files:
                if os.path.join(path, name).endswith(".csv"):
                    current_file = os.path.join(path, name)
                    file_split = current_file.split("\\")
                    try:
                        df_file = pd.read_csv(current_file)
                    except Exception:
                        continue

                    files_data.append(
                        {
                            "exchange": file_split[1],
                            "timeframe": file_split[2],
                            "pair": file_split[3][:-4],
                            "occurences": len(df_file),
                            "start_date": str(
                                datetime.fromtimestamp(df_file.iloc[0]["date"] / 1000)
                            ),
                            "end_date": str(
                                datetime.fromtimestamp(df_file.iloc[-1]["date"] / 1000)
                            ),
                        }
                    )

        return pd.DataFrame(files_data)


class TooManyError(Exception):
    pass


exchange = ExchangeDataManager(
    exchange_name="binance", path_download="../database/exchanges"
)

data = asyncio.run(exchange.download_data(["SOLUSDT", "BTCUSDT" ], ["1h"], start_date="2017-08-17 00:00:00"))

Retrouvez le code sur notre github : https://github.com/CryptoRobotFr/backtest_tools

Notre code intègre la bibliothèque ccxt pour une gestion efficace de plusieurs plateformes de crypto-monnaies.

Nous vos proposons un petit guide afin d'utiliser ce code.

Télécharger l'historique d'une ou plusieurs crypto

Ce code permet le téléchargement d'historique de plusieurs cryptos sur plusieurs intervalles.

exchange = ExchangeDataManager(exchange_name="binance", path_download="./database/exchanges")

asyncio.run(exchange.download_data(["BTCUSDT"], ["1m"], start_date="2020-01-01 00:00:00"))

Dans cet exemple nous téléchargeons les données du Bitcoin en 1 minute sur binance. Les données seront stockées, dans le dossier "database/exchanges" situé à la racine de votre script. Supposons désormais que nous souhaitons télécharger les données sur 1h et nous voulons également télécharger l'historique d'Ethereum toujours sur la stable coin USDT. Notre code sera :

asyncio.run(exchange.download_data(["BTCUSDT", "ETHUSDT"], ["1m", "1h"], start_date="2020-01-01 00:00:00"))

Comment charger l'historique dans un dataframe ?

eth_histo = exchange.load_data('ETHUSDT', '1h', start_date=""2020-01-01")
btc_histo = exchange.load_data('BTCUSDT', '1m', start_date=""2020-01-01")

Comment récupérer l'historique sur une autre plateforme ?

La variable "CCXT_EXCHANGES" est cruciale dans ce code, car elle permet d'ajouter facilement des plateformes de trading prises en charge par ccxt. Ne paniquez pas si votre plateforme n'apparaît pas, on vous montre comment l'ajouter rapidement.

La première étape consiste à se rendre sur le dépot github de la librairie de trading. Il s'agit de vérifier que votre plateforme est supportée par ccxt. https://github.com/ccxt/ccxt/wiki/Exchange-Markets

Pour notre exemple nous avons sélectionné de manière totalement hasardeuse l'échange bitmart.

Il suffit maintenant de se rendre dans la documentation de l'api de bitmart, et de chercher combien de bougies maximum peuvent être retournées. Si vous n'êtes pas habitué à chercher dans les documentations vous pouvez utiliser la barre de recherche, et chercher des mots clés tel que : "Market Data", "K-Line" ...

Sur bitmart par exemple c'est 500 bougies.

Capture d’écran 2023-02-27 à 09.16.21.png

Une fois l'information en poche il suffit de rajouter à notre dictionnaire "CCXT_EXCHANGES" ceci :

CCXT_EXCHANGES = {
... ,
"bitmart": {
            "ccxt_object": ccxt.bitmart(config={"enableRateLimit": True}),
            "limit_size_request": 500,
        }
}

exchange = ExchangeDataManager(exchange_name="bitmart", path_download="./database/exchanges")

Avec ce tutoriel simple, vous pouvez télécharger rapidement les données du Bitcoin et d'Ethereum ou tout autre crypto sur énormement d'échanges en utilisant Python. Cela vous sera très utile pour effectuer une analyse de marché ou pour créer des stratégies de trading automatisées. N'hésitez pas à personnaliser le code en fonction de vos besoins spécifiques.

Si vous avez des questions n'hésitez pas à vous rendre sur notre discord : https://discord.gg/VQ3Cuv3z