In my previous post, we saw how to submit a Pyspark job to AWS EMR cluster. In this post, I will go over the setup of the cluster.
Before we start with the cluster, we must have a certificate keypair (.pem file) and a security group setup in AWS. There are many resources available online for this and I will not go into the details.
Next is selecting the custom AMI that I mentioned in my previous post as a base machine for all nodes in the cluster so that Anaconda Python and QuantLib are already installed on the nodes. The AMI that I used is publicly available for now, it costs me about $5 per month to keep it alive. The details of the AMI are as shown below.

I selected 1 m3xlarge (4 cores, 15 GB) machine for my master node and 3 r4x8large (16 cores, 244 GB) machines for my worker nodes. I also always select spot pricing for all my nodes.
Once the cluster is up and running (in waiting mode), first you need to secure copy spark-pfe.py file (available at my github repo spark-pfe github repo) to the master node and then you can SSH into master node to run the job using spark-submit script from previous post.
It took about 7 minutes for Pyspark job to complete, it computed netting set NPV for 5000 simulations across future 454 dates for 2 swaps and 1 FxFwd. The output of the simulation was written to S3 bucket and now its time to pull it onto local machine for analysis. Loading the NPV cube on a local machine is fine for demonstration purposes as we have only 5000 simulations but I would load the NPV cube into Amazon Redshift or AuroraDB for production purposes.
We will use boto3 library for downloading the files from S3.
1 2 3 4 5 6 7 8 9 10 11 12 |
from pylab import * from boto3.session import Session from io import StringIO NSim = 5000 # I know the number of simulations session = Session(aws_access_key_id='AWS_ACCESS_KEY', aws_secret_access_key='AWS_SECRET_ACCESS_KEY') s3 = session.resource("s3") s3.Bucket('your-bucket').download_file('output/time-grid/part-00000', 'time_grid') s3.Bucket('your-bucket').download_file('output/npv_cube/part-00000', 'npv_cube') |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#load the files into numpy arrays for analysis T = np.loadtxt('time_grid') data_file = open('npv_cube', 'r') block = '' npv_cube = np.zeros((NSim,len(T),2)) #quick way to load, I am sure there are better ways for line in data_file: block += line.replace('[','').replace(']','').lstrip() data_file.close() npv_cube = np.loadtxt(StringIO(unicode(block))) #reshape to array with NSim rows, time-grid columns and with each cell having 2 NPVs npv_cube = npv_cube.reshape(NSim, len(T), 2) |
Once we have the time grid and NPV cube in memory, we can do some plots to visualize the simulated exposure paths. The Blue paths are for Collateralized exposures and Red are for Uncollateralized.
1 2 3 4 5 6 7 8 9 |
#plot 30 simulated exposure paths (out of 5000) plt.figure(figsize=(7, 5), dpi=300) for i in range(0, 30): plt.plot(T, npv_cube[i,:,0]/1000., 'r') # Uncollateralized plt.plot(T, npv_cube[i, :, 1] / 1000., 'b') # Collateralized plt.xlabel("Time in years") plt.ylabel("Exposure in Thousands") plt.title("Simulated Exposure paths") plt.show() |
Now we can calculate and plot the Expected Exposure where we just take the positive exposures from the above simulated exposure paths.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
# Calculate the expected exposure E = npv_cube.copy() uncoll_exposures = npv_cube[:,:,0] uncoll_exposures[uncoll_exposures < 0] = 0 uncoll_expected_exposures = np.sum(uncoll_exposures, axis=0) / NSim coll_exposures = npv_cube[:,:,1] coll_exposures[coll_exposures < 0] = 0 coll_expected_exposures = np.sum(coll_exposures, axis=0) / NSim plt.figure(figsize=(7, 5), dpi=300) plt.plot(T, uncoll_expected_exposures/1000., 'r', label='Uncollateralized') plt.plot(T, coll_expected_exposures/1000., 'b', label='Collateralized') plt.xlabel("Time in years") plt.ylabel("Expected Exposure in Thousands") plt.title("Expected Exposure") plt.legend(loc='upper left') |
Now we can plot the PFE curves where we take the 95% quantile of above expected exposures.
1 2 3 4 5 6 7 8 9 10 |
# Calculate the PFE curve (95% quantile) uncoll_PFE_curve = np.percentile(uncoll_exposures, 95, axis=0, interpolation='higher') coll_PFE_curve = np.percentile(coll_exposures, 95, axis=0, interpolation='higher') plt.figure(figsize=(7,7), dpi=300) plt.plot(T, uncoll_PFE_curve, 'r', label='Uncollateralized') plt.plot(T, coll_PFE_curve, 'b', label='Collateralized') plt.xlabel("Time in years") plt.ylabel("PFE") plt.title("PFE Curves") plt.legend(loc='upper left') |
We can now calculate the Maximum PFE for both the curves.
1 2 3 4 5 6 |
# calculate the maximum pfe MPFE = np.max(uncoll_PFE_curve) print 'Maximum Uncollateralized PFE:%f' % MPFE MPFE = np.max(coll_PFE_curve) print 'Maximum Collateralized PFE:%f' % MPFE |
Maximum Uncollateralized PFE: 260,962.61
Maximum Collateralized PFE: 252,916.08
The spark-submit script that I have used is by no means optimized. I have tried tuning the various spark memory and number of executors parameters in a trial and error approach within a limited time frame I had. The configuration I came up with is optimal for the cluster I used but I am sure it can be improved.
Thanks for stopping by.