It’s always tempting to dive into model building and tinker with model deployment, but for a successful ML project, having a solid ETL pipeline is essential. After all, it’s garbage in, garbage out! A couple of years ago, we discovered some discrepancies between the crypto prices in Turkey and Europe. We were quite excited and hastily wrote some spaghetti code to operate a bot that engaged in arbitrage trading. We made some money but encountered legal challenges. Also, as time went by, the arbitrage business became less profitable and eventually faded away.
We wanted to revive this project, but this time we didn’t want to limit ourselves to risk-free arbitrage. We aimed for statistical arbitrage and stock price prediction to engage in fully-fledged algorithmic trading. However, we needed a reliable data collection pipeline for backtesting. That’s why, in this blogpost, I am sharing our simplified architectural strategy to collect data from various crypto exchanges in Europe and Turkey. At the end of this post, you will find a link to the repository where you can explore the code in detail.
We aimed to keep things simple and affordable. In our case, we required instantaneous order book data. Ideally, one could use Amazon Timestream, which is specially designed for time-series data. In such a setup, as soon as the data from an exchange arrives through a websocket, it can be pushed to Timestream.
However, we didn’t need real-time capability during the research phase, so we opted for a solution that was simple yet efficient. We still connected to exchanges through websocket or async HTTP requests, but we stored the data in Parquet files in S3. To minimize write operations to S3, we maintained a data buffer. Once the buffer was full, we wrote the buffered data to S3 and refreshed the buffer. Files were partitioned by the exchange and time, enabling easy access at later stages. And finally, we deployed this data collector on an EC2 instance within the free tier. You can see how inexpensive and straightforward it is :)
We understand that setting everything up through the console can be quite cumbersome. In the end, there are many settings to configure. That’s why we are using AWS CloudFormation as our infrastructure-as-code tool. Later, we will explore how we use GitHub Actions and CloudFormation to establish a smooth CD pipeline!
In summary, we use S3 and EC2 as our Cloud tech stack
Each exchange comes with its own API and even a Python wrapper. However, this poses a problem for us as we aim for our data collection to be agnostic to the implementation details of different exchanges. There should be unifying methods and functions that work across all exchanges. That’s why we opted for Interface Design. We have the ExchangeInterface
that outlines the methods and attributes to be implemented by the concrete classes. In our setup, the concrete classes are, for instance, Bitvavo
for Bitvavo Exchange and BTCTurk
for BTCTurk Exchange.
Below is a glimpse of our Exchange Interface:
class ExchangeInterface(ABC):
name = None
ws_handlers: None
def __init__(self):
pass
# self.set_api_keys(public_key, private_key)
@abstractmethod
def fetch_orderbook(self, pair:str, limit: int = None)-> dict:
pass
@abstractmethod
async def async_fetch_orderbook(self, pair:str, limit: int = None) -> dict:
pass
@abstractmethod
def subscribe(self, event_types: List[str], pairs: List[str]):
pass
Let’s also discuss our Writer Classes. We adopted a similar approach here. Perhaps in the future, we might consider saving data into AWS TimeStream, or we might opt to save it locally for testing purposes. Thus, we have a writer interface that defines the methods that need to be implemented by the concrete classes, such as AWSTimeStream or S3Parquet.
class WriterInterface(ABC):
"""
Abstract class defining the interface for a writer object.
This interface ensures that the writer has methods to append data,
check if a buffer is full, and save & refresh its contents.
"""
@abstractmethod
def append(self, data: List[dict], event_type):
"""
Appends the given data.
Args:
data (List[dict]): Data to append.
"""
pass
@abstractmethod
def is_buffer_full(self, event_type) -> bool:
"""
Checks if the buffer is full.
Returns:
bool: True if buffer is full, False otherwise.
"""
pass
@abstractmethod
def save_and_refresh(self, event_type):
"""
Saves the current buffer's data and then refreshes the buffer.
"""
pass
def _list_of_dict_to_df(self, data: List[dict]) -> pd.DataFrame:
"""
Converts a list of dictionaries to a pandas DataFrame.
Args:
data (List[dict]): List of dictionaries to be converted.
Returns:
pd.DataFrame: The converted pandas DataFrame.
"""
return pd.DataFrame(data)
Note that typically, interfaces wouldn’t contain concrete methods, but in Python, we don’t have such clear-cut distinctions. That’s why we use WriterInterface both as an interface and also as a parent class that holds some common methods to be used by the concrete classes.
In runtime, based on the requested writer class, we initialize it through our writer factory. Below is a simple code snippet to instantiate the S3Writer class.
def create_parquet_s3_writer(buffer_size: int,
partition_cols: List[str] = None) -> S3ParquetWriter:
"""
Create an S3ParquetWriter instance using a specified configuration.
Args:
buffer_size (int): The size of the buffer to use.
partition_cols (List[str], optional): The list of columns to use for partitioning.
Returns:
S3ParquetWriter: A configured S3ParquetWriter instance.
"""
# Load the S3 configuration details from a centralized location
config = load_config_by_name("data")["s3"]
return S3ParquetWriter(config["bucket"], config["prefix"], buffer_size, partition_cols)
Finally, we have our DataCollector module that takes the list of exchanges and our concrete writer class. We aimed to adhere to the composition principle to enable flexibility and reuse. For a deeper dive into the code, please refer to our repo. Now, I want to shift our focus to our straightforward continuous deployment part.
We aspired to have a very straightforward CD. As soon as we push code to the master branch, we want to recreate the cloud tech stack and continue pulling data with the new code. Ideally, you need to ensure that no data is lost during the transition. However, our project was more of a POC and research-oriented, so we could tolerate some missing data points.
To achieve our goal, we utilized GitHub Actions, S3, and CloudFormation.
name: Deploy to S3
on:
push:
branches:
- master
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Check-out repository
uses: actions/checkout@v2
- name: Deploy to S3
uses: jakejarvis/s3-sync-action@master
with:
args: --follow-symlinks --exclude '.git/*' --exclude '.github/*'
env:
AWS_S3_BUCKET: ibrahimcikotest
AWS_ACCESS_KEY_ID: $
AWS_SECRET_ACCESS_KEY: $
AWS_REGION: 'eu-west-1' # or your preferred AWS region
DEST_DIR: 'exchanges'
- name: Update CloudFormation Stack
run: |
aws cloudformation delete-stack --stack-name myteststack
# Wait for stack deletion to be completed
aws cloudformation wait stack-delete-complete --stack-name myteststack
aws cloudformation create-stack --stack-name myteststack --template-body file://./DataCollectorEC2Setup.yaml --parameters ParameterKey=S3BucketName,ParameterValue=ibrahimcikotest --capabilities CAPABILITY_IAM
env:
AWS_ACCESS_KEY_ID: $
AWS_SECRET_ACCESS_KEY: $
AWS_DEFAULT_REGION: 'eu-west-1'
In this YAML file, as soon as there is a push to our master branch, we upload the latest code from the master to S3. Then, we create the AWS stack through our CloudFormation template.
aws cloudformation create-stack --stack-name myteststack --template-body file://./DataCollectorEC2Setup.yaml --parameters ParameterKey=S3BucketName,ParameterValue=ibrahimcikotest --capabilities CAPABILITY_IAM
In our CloudFormation Template, we have an initialization script under UserData that runs when we initialize the script. This initialization script pulls the latest code from S3 to the EC2 instance, addresses some permission issues, and executes the ETL according to our configuration. Here is a glimpse of our init script in CloudFormation.
#!/bin/bash
# Create a directory in the home directory of the EC2 user to store your code
mkdir -p /home/ec2-user/${RepositoryName}
# ownership
sudo chown -R ec2-user:ec2-user /home/ec2-user/${RepositoryName}/
sudo chmod -R 755 /home/ec2-user/${RepositoryName}
# Sync the latest code from S3 to this directory
aws s3 sync s3://${S3BucketName}/${RepositoryName} /home/ec2-user/${RepositoryName}
# Navigate to this directory
cd /home/ec2-user/${RepositoryName}
# Make the script executable and then run it
chmod +x init.sh
./init.sh ${Environment}
- RepositoryName: !FindInMap ["Constants", "Values", "RepositoryName"]
WriterType: !Ref WriterType
BufferSize: !Ref BufferSize
SleepDuration: !Ref SleepDuration
Many details have been omitted for brevity, such as the structure for configuration, the use of poetry as a package manager, and so forth. Nonetheless, I believe I’ve covered the main points of this straightforward ETL system. I hope it is helpful for anyone looking to explore the world of algorithmic trading :D
My advice is to delve into the technology to learn extensively, but I neither recommend for nor against engaging in actual trading. That decision is entirely up to you.