Introduction
In this post, I’ll be implementing a Random Forest classifier from scratch in Python. This is the ninth post in the “Machine Learning from Scratch” series.
Random Forest is an ensemble learning method that combines multiple decision trees to create a more robust and accurate classifier. It’s one of the most popular machine learning algorithms due to its high performance and resistance to overfitting.
Random Forest
Random Forest builds multiple decision trees on random subsets of the training data and features, then aggregates their predictions through voting (for classification) or averaging (for regression).
The key ideas behind Random Forest are:
- Bootstrap Aggregating (Bagging): Each tree is trained on a random sample of the data with replacement
- Feature Randomness: Each split considers only a random subset of features
- Voting: The final prediction is the majority vote across all trees
These techniques reduce overfitting and improve generalization compared to a single decision tree.
Implementation
I’m using the decision tree implementation from the previous post as a building block. I’ll also use numpy for numerical computations and Counter from collections for majority voting.
The RandomForest class has the following methods:
__init__: Constructor to set the number of trees, tree parameters, and feature sampling.fit: Method to train multiple decision trees on bootstrapped samples.predict: Method to aggregate predictions from all trees.
import numpy as np
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn import datasets
class Node:
def __init__(self, feature=None, threshold=None, left=None, right=None, value=None):
self.feature = feature
self.threshold = threshold
self.left = left
self.right = right
self.value = value
def is_leaf_node(self):
return self.value is not None
class DecisionTree:
def __init__(self, max_depth=10, min_samples_split=2, n_features=None):
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.n_features = n_features
self.root = None
def fit(self, X, y):
self.n_features = X.shape[1] if not self.n_features else min(X.shape[1], self.n_features)
self.root = self._grow_tree(X, y)
def _grow_tree(self, X, y, depth=0):
num_samples, num_features = X.shape
num_labels = len(np.unique(y))
if depth >= self.max_depth or num_labels == 1 or num_samples < self.min_samples_split:
leaf_value = self._most_common_label(y)
return Node(value=leaf_value)
feat_idxs = np.random.choice(num_features, self.n_features, replace=False)
best_feature, best_threshold = self._best_split(X, y, feat_idxs)
left_idxs = np.argwhere(X[:, best_feature] <= best_threshold).flatten()
right_idxs = np.argwhere(X[:, best_feature] > best_threshold).flatten()
left = self._grow_tree(X[left_idxs, :], y[left_idxs], depth + 1)
right = self._grow_tree(X[right_idxs, :], y[right_idxs], depth + 1)
return Node(best_feature, best_threshold, left, right)
def _best_split(self, X, y, feat_idxs):
best_gain = -1
split_idx, split_threshold = None, None
for feat_idx in feat_idxs:
X_column = X[:, feat_idx]
thresholds = np.unique(X_column)
for threshold in thresholds:
gain = self._information_gain(y, X_column, threshold)
if gain > best_gain:
best_gain = gain
split_idx = feat_idx
split_threshold = threshold
return split_idx, split_threshold
def _information_gain(self, y, X_column, threshold):
parent_entropy = self._entropy(y)
left_idxs = np.argwhere(X_column <= threshold).flatten()
right_idxs = np.argwhere(X_column > threshold).flatten()
if len(left_idxs) == 0 or len(right_idxs) == 0:
return 0
n = len(y)
n_l, n_r = len(left_idxs), len(right_idxs)
e_l, e_r = self._entropy(y[left_idxs]), self._entropy(y[right_idxs])
child_entropy = (n_l / n) * e_l + (n_r / n) * e_r
return parent_entropy - child_entropy
def _entropy(self, y):
hist = np.bincount(y)
ps = hist / len(y)
return -np.sum([p * np.log2(p) for p in ps if p > 0])
def _most_common_label(self, y):
counter = Counter(y)
return counter.most_common(1)[0][0]
def predict(self, X):
return np.array([self._traverse_tree(x, self.root) for x in X])
def _traverse_tree(self, x, node):
if node.is_leaf_node():
return node.value
if x[node.feature] <= node.threshold:
return self._traverse_tree(x, node.left)
return self._traverse_tree(x, node.right)
class RandomForest:
def __init__(self, n_trees=10, max_depth=10, min_samples_split=2, n_features=None):
self.n_trees = n_trees
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.n_features = n_features
self.trees = []
def fit(self, X, y):
self.trees = []
for _ in range(self.n_trees):
tree = DecisionTree(
max_depth=self.max_depth,
min_samples_split=self.min_samples_split,
n_features=self.n_features
)
X_sample, y_sample = self._bootstrap_samples(X, y)
tree.fit(X_sample, y_sample)
self.trees.append(tree)
def _bootstrap_samples(self, X, y):
num_samples = X.shape[0]
idxs = np.random.choice(num_samples, num_samples, replace=True)
return X[idxs], y[idxs]
def predict(self, X):
predictions = np.array([tree.predict(X) for tree in self.trees])
tree_preds = np.swapaxes(predictions, 0, 1)
predictions = np.array([self._most_common_label(pred) for pred in tree_preds])
return predictions
def _most_common_label(self, y):
counter = Counter(y)
return counter.most_common(1)[0][0]
Now let’s test the Random Forest on a classification dataset.
def accuracy(y_test, predictions):
return np.sum(y_test == predictions) / len(y_test)
if __name__ == '__main__':
X, y = datasets.make_classification(
n_samples=1000, n_features=10, n_classes=3,
n_informative=8, random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
model = RandomForest(n_trees=20, max_depth=10)
model.fit(X_train, y_train)
predictions = model.predict(X_test)
acc = accuracy(y_test, predictions)
print(f"Accuracy: {acc}")
Random Forest typically achieves higher accuracy than a single decision tree and is much more resistant to overfitting. The ensemble approach combines the predictions of multiple trees, reducing variance while maintaining low bias.
That’s all for this post. Thanks for reading!