Kmeans with Persistent Storage
KMeans is machine-learning algorithm (NP-hard), popularly employed for cluster analysis in data mining, and interesting for benchmarking and performance evaluation.
The objective of the Kmeans algorithm to group a set of multidimensional points into a predefined number of clusters, in which each point belongs to the closest cluster (with the nearest mean distance), in an iterative process.
In this application we make use of the persistent storage API.
In particular, the dataset fragments are considered StorageObject
,
delegating its content into the persistent framework.
Since the data model (object declared as storage object) includes functions,
it can run efficiently with dataClay.
First, lets see the data model (storage_model/fragment.py
)
from storage.api import StorageObject
try:
from pycompss.api.task import task
from pycompss.api.parameter import IN
except ImportError:
# Required since the pycompss module is not ready during the registry
from dataclay.contrib.dummy_pycompss import task, IN
from dataclay import dclayMethod
import numpy as np
from sklearn.metrics import pairwise_distances
class Fragment(StorageObject):
"""
@ClassField points numpy.ndarray
@dclayImport numpy as np
@dclayImportFrom sklearn.metrics import pairwise_distances
"""
@dclayMethod()
def __init__(self):
super(Fragment, self).__init__()
self.points = None
@dclayMethod(num_points='int', dim='int', mode='str', seed='int')
def generate_points(self, num_points, dim, mode, seed):
"""
Generate a random fragment of the specified number of points using the
specified mode and the specified seed. Note that the generation is
distributed (the master will never see the actual points).
:param num_points: Number of points
:param dim: Number of dimensions
:param mode: Dataset generation mode
:param seed: Random seed
:return: Dataset fragment
"""
# Random generation distributions
rand = {
'normal': lambda k: np.random.normal(0, 1, k),
'uniform': lambda k: np.random.random(k),
}
r = rand[mode]
np.random.seed(seed)
mat = np.asarray(
[r(dim) for __ in range(num_points)]
)
# Normalize all points between 0 and 1
mat -= np.min(mat)
mx = np.max(mat)
if mx > 0.0:
mat /= mx
self.points = mat
@task(returns=np.ndarray, target_direction=IN)
@dclayMethod(centres='numpy.ndarray', return_='anything')
def partial_sum(self, centres):
partials = np.zeros((centres.shape[0], 2), dtype=object)
arr = self.points
close_centres = pairwise_distances(arr, centres).argmin(axis=1)
for center_idx, _ in enumerate(centres):
indices = np.argwhere(close_centres == center_idx).flatten()
partials[center_idx][0] = np.sum(arr[indices], axis=0)
partials[center_idx][1] = indices.shape[0]
return partials
Now we can focus in the main kmeans application (kmeans.py
):
import time
import numpy as np
from pycompss.api.task import task
from pycompss.api.api import compss_wait_on
from pycompss.api.api import compss_barrier
from storage_model.fragment import Fragment
from sklearn.metrics.pairwise import paired_distances
@task(returns=dict)
def merge(*data):
accum = data[0].copy()
for d in data[1:]:
accum += d
return accum
def converged(old_centres, centres, epsilon, iteration, max_iter):
if old_centres is None:
return False
dist = np.sum(paired_distances(centres, old_centres))
return dist < epsilon ** 2 or iteration >= max_iter
def recompute_centres(partials, old_centres, arity):
centres = old_centres.copy()
while len(partials) > 1:
partials_subset = partials[:arity]
partials = partials[arity:]
partials.append(merge(*partials_subset))
partials = compss_wait_on(partials)
for idx, sum_ in enumerate(partials[0]):
if sum_[1] != 0:
centres[idx] = sum_[0] / sum_[1]
return centres
def kmeans_frag(fragments, dimensions, num_centres=10, iterations=20,
seed=0., epsilon=1e-9, arity=50):
"""
A fragment-based K-Means algorithm.
Given a set of fragments (which can be either PSCOs or future objects that
point to PSCOs), the desired number of clusters and the maximum number of
iterations, compute the optimal centres and the index of the centre
for each point.
PSCO.mat must be a NxD float np.ndarray, where D = dimensions
:param fragments: Number of fragments
:param dimensions: Number of dimensions
:param num_centres: Number of centres
:param iterations: Maximum number of iterations
:param seed: Random seed
:param epsilon: Epsilon (convergence distance)
:param arity: Arity
:return: Final centres and labels
"""
# Set the random seed
np.random.seed(seed)
# Centres is usually a very small matrix, so it is affordable to have it in
# the master.
centres = np.asarray(
[np.random.random(dimensions) for _ in range(num_centres)]
)
# Note: this implementation treats the centres as files, never as PSCOs.
old_centres = None
iteration = 0
while not converged(old_centres, centres, epsilon, iteration, iterations):
print("Doing iteration #%d/%d" % (iteration + 1, iterations))
old_centres = centres.copy()
partials = []
for frag in fragments:
partial = frag.partial_sum(old_centres)
partials.append(partial)
centres = recompute_centres(partials, old_centres, arity)
iteration += 1
return centres
def parse_arguments():
"""
Parse command line arguments. Make the program generate
a help message in case of wrong usage.
:return: Parsed arguments
"""
import argparse
parser = argparse.ArgumentParser(description='KMeans Clustering.')
parser.add_argument('-s', '--seed', type=int, default=0,
help='Pseudo-random seed. Default = 0')
parser.add_argument('-n', '--numpoints', type=int, default=100,
help='Number of points. Default = 100')
parser.add_argument('-d', '--dimensions', type=int, default=2,
help='Number of dimensions. Default = 2')
parser.add_argument('-c', '--num_centres', type=int, default=5,
help='Number of centres. Default = 2')
parser.add_argument('-f', '--fragments', type=int, default=10,
help='Number of fragments.' +
' Default = 10. Condition: fragments < points')
parser.add_argument('-m', '--mode', type=str, default='uniform',
choices=['uniform', 'normal'],
help='Distribution of points. Default = uniform')
parser.add_argument('-i', '--iterations', type=int, default=20,
help='Maximum number of iterations')
parser.add_argument('-e', '--epsilon', type=float, default=1e-9,
help='Epsilon. Kmeans will stop when:' +
' |old - new| < epsilon.')
parser.add_argument('-a', '--arity', type=int, default=50,
help='Arity of the reduction carried out during \
the computation of the new centroids')
return parser.parse_args()
from storage_model.fragment import Fragment # this will have to be removed
@task(returns=Fragment)
def generate_fragment(points, dim, mode, seed):
"""
Generate a random fragment of the specified number of points using the
specified mode and the specified seed. Note that the generation is
distributed (the master will never see the actual points).
:param points: Number of points
:param dim: Number of dimensions
:param mode: Dataset generation mode
:param seed: Random seed
:return: Dataset fragment
"""
fragment = Fragment()
# Make persistent before since it is populated in the task
fragment.make_persistent()
fragment.generate_points(points, dim, mode, seed)
def main(seed, numpoints, dimensions, num_centres, fragments, mode, iterations,
epsilon, arity):
"""
This will be executed if called as main script. Look at the kmeans_frag
for the KMeans function.
This code is used for experimental purposes.
I.e it generates random data from some parameters that determine the size,
dimensionality and etc and returns the elapsed time.
:param seed: Random seed
:param numpoints: Number of points
:param dimensions: Number of dimensions
:param num_centres: Number of centres
:param fragments: Number of fragments
:param mode: Dataset generation mode
:param iterations: Number of iterations
:param epsilon: Epsilon (convergence distance)
:param arity: Arity
:return: None
"""
start_time = time.time()
# Generate the data
fragment_list = []
# Prevent infinite loops in case of not-so-smart users
points_per_fragment = max(1, numpoints // fragments)
for l in range(0, numpoints, points_per_fragment):
# Note that the seed is different for each fragment.
# This is done to avoid having repeated data.
r = min(numpoints, l + points_per_fragment)
fragment_list.append(
generate_fragment(r - l, dimensions, mode, seed + l)
)
compss_barrier()
print("Generation/Load done")
initialization_time = time.time()
print("Starting kmeans")
# Run kmeans
centres = kmeans_frag(fragments=fragment_list,
dimensions=dimensions,
num_centres=num_centres,
iterations=iterations,
seed=seed,
epsilon=epsilon,
arity=arity)
compss_barrier()
print("Ending kmeans")
kmeans_time = time.time()
print("-----------------------------------------")
print("-------------- RESULTS ------------------")
print("-----------------------------------------")
print("Initialization time: %f" % (initialization_time - start_time))
print("Kmeans time: %f" % (kmeans_time - initialization_time))
print("Total time: %f" % (kmeans_time - start_time))
print("-----------------------------------------")
centres = compss_wait_on(centres)
print("CENTRES:")
print(centres)
print("-----------------------------------------")
if __name__ == "__main__":
options = parse_arguments()
main(**vars(options))
Tip
This code can work with Hecuba and Redis if the functions declared in
the data model are declared outside the data model, and the kmeans
application uses the points
attribute explicitly.
Since this code is going to be executed with dataClay, it is be necessary to
declare the client.properties
, session.properties
and
storage_props.cfg
files into the dataClay_confs
with the following
contents as example (more configuration options can be found in the
dataClay manual):
- client.properties
HOST=127.0.0.1 TCPPORT=11034
- session.properties
Account=bsc_user Password=bsc_user StubsClasspath=./stubs DataSets=hpc_dataset DataSetForStore=hpc_dataset DataClayClientConfig=./client.properties
- storage_props.cfg
BACKENDS_PER_NODE=48
An example of the submission script that can be used in MareNostrum IV to launch this kmeans with PyCOMPSs and dataClay is:
dsafasdf
#!/bin/bash -e
module load gcc/8.1.0
export COMPSS_PYTHON_VERSION=3-ML
module load COMPSs/2.7
module load mkl/2018.1
module load impi/2018.1
module load opencv/4.1.2
module load DATACLAY/2.4.dev
# Retrieve script arguments
job_dependency=${1:-None}
num_nodes=${2:-2}
execution_time=${3:-5}
tracing=${4:-false}
exec_file=${5:-$(pwd)/kmeans.py}
# Freeze storage_props into a temporal
# (allow submission of multiple executions with varying parameters)
STORAGE_PROPS=`mktemp -p ~`
cp $(pwd)/dataClay_confs/storage_props.cfg "${STORAGE_PROPS}"
if [[ ! ${tracing} == "false" ]]
then
extra_tracing_flags="\
--jvm_workers_opts=\"-javaagent:/apps/DATACLAY/dependencies/aspectjweaver.jar\" \
--jvm_master_opts=\"-javaagent:/apps/DATACLAY/dependencies/aspectjweaver.jar\" \
"
echo "Adding DATACLAYSRV_START_CMD to storage properties file"
echo "\${STORAGE_PROPS}=${STORAGE_PROPS}"
echo "" >> ${STORAGE_PROPS}
echo "DATACLAYSRV_START_CMD=\"--tracing\"" >> ${STORAGE_PROPS}
fi
# Define script variables
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
WORK_DIR=${SCRIPT_DIR}/
APP_CLASSPATH=${SCRIPT_DIR}/
APP_PYTHONPATH=${SCRIPT_DIR}/
# Define application variables
graph=$tracing
log_level="off"
qos_flag="--qos=debug"
workers_flag=""
constraints="highmem"
CPUS_PER_NODE=48
WORKER_IN_MASTER=0
shift 5
# Those are evaluated at submit time, not at start time...
COMPSS_VERSION=`module load whatis COMPSs 2>&1 >/dev/null | awk '{print $1 ; exit}'`
DATACLAY_VERSION=`module load whatis DATACLAY 2>&1 >/dev/null | awk '{print $1 ; exit}'`
# Enqueue job
enqueue_compss \
--job_name=kmeansOO_PyCOMPSs_dataClay \
--job_dependency="${job_dependency}" \
--exec_time="${execution_time}" \
--num_nodes="${num_nodes}" \
\
--cpus_per_node="${CPUS_PER_NODE}" \
--worker_in_master_cpus="${WORKER_IN_MASTER}" \
--scheduler=es.bsc.compss.scheduler.loadbalancing.LoadBalancingScheduler \
\
"${workers_flag}" \
\
--worker_working_dir=/gpfs/scratch/user/ \
\
--constraints=${constraints} \
--tracing="${tracing}" \
--graph="${graph}" \
--summary \
--log_level="${log_level}" \
"${qos_flag}" \
\
--classpath=${DATACLAY_JAR} \
--pythonpath=${APP_PYTHONPATH}:${PYCLAY_PATH}:${PYTHONPATH} \
--storage_props=${STORAGE_PROPS} \
--storage_home=$COMPSS_STORAGE_HOME \
--prolog="$DATACLAY_HOME/bin/dataclayprepare,$(pwd)/storage_model/,$(pwd)/,storage_model,python" \
\
${extra_tracing_flags} \
\
--lang=python \
\
"$exec_file" $@ --use_storage