Estimating Potential Future Exposure with QuantLib and AWS EMR – Part II

In my previous post, we saw how to submit a Pyspark job to AWS EMR cluster. In this post, I will go over the setup of the cluster.

Before we start with the cluster, we must have a certificate keypair (.pem file) and a security group setup in AWS. There are many resources available online for this and I will not go into the details.

Next is selecting the custom AMI that I mentioned in my previous post as a base machine for all nodes in the cluster so that Anaconda Python and QuantLib are already installed on the nodes.  The AMI that I used is publicly available for now, it costs me about $5 per month to keep it alive. The details of the AMI are as shown below.

AMI ID ami-c56a94bf

I selected 1 m3xlarge (4 cores, 15 GB) machine for my master node and 3 r4x8large (16 cores, 244 GB) machines for my worker nodes. I also always select spot pricing for all my nodes.

Once the cluster is up and running (in waiting mode), first you need to secure copy file (available at my github repo spark-pfe github repo) to the master node and then you can SSH into master node to run the job using spark-submit script from previous post.

It took about 7 minutes for Pyspark job to complete, it computed netting set NPV for 5000 simulations across future 454 dates for 2 swaps and 1 FxFwd. The output of the simulation was written to S3 bucket and now its time to pull it onto local machine for analysis. Loading the NPV cube on a local machine is fine for demonstration purposes as we have only 5000 simulations but I would load the NPV cube into Amazon Redshift or AuroraDB for production purposes.

We will use boto3 library for downloading the files from S3.

from pylab import *
from boto3.session import Session
from io import StringIO

NSim = 5000 # I know the number of simulations

session = Session(aws_access_key_id='AWS_ACCESS_KEY',
s3 = session.resource("s3")
s3.Bucket('your-bucket').download_file('output/time-grid/part-00000', 'time_grid')
s3.Bucket('your-bucket').download_file('output/npv_cube/part-00000', 'npv_cube')
#load the files into numpy arrays for analysis
T = np.loadtxt('time_grid')
data_file = open('npv_cube', 'r')
block = ''

npv_cube = np.zeros((NSim,len(T),2))

#quick way to load, I am sure there are better ways
for line in data_file:
    block += line.replace('[','').replace(']','').lstrip()

npv_cube = np.loadtxt(StringIO(unicode(block)))
#reshape to array with NSim rows, time-grid columns and with each cell having 2 NPVs
npv_cube = npv_cube.reshape(NSim, len(T), 2)

Once we have the time grid and NPV cube in memory, we can do some plots to visualize the simulated exposure paths. The Blue paths are for Collateralized exposures and Red are for Uncollateralized.

#plot 30 simulated exposure paths (out of 5000)
plt.figure(figsize=(7, 5), dpi=300)
for i in range(0, 30):
    plt.plot(T, npv_cube[i,:,0]/1000., 'r') # Uncollateralized
    plt.plot(T, npv_cube[i, :, 1] / 1000., 'b') # Collateralized
plt.xlabel("Time in years")
plt.ylabel("Exposure in Thousands")
plt.title("Simulated Exposure paths")

Now we can calculate and plot the Expected Exposure where we just take the positive exposures from the above simulated exposure paths.

# Calculate the expected exposure
E = npv_cube.copy()
uncoll_exposures = npv_cube[:,:,0]
uncoll_exposures[uncoll_exposures < 0] = 0
uncoll_expected_exposures = np.sum(uncoll_exposures, axis=0) / NSim

coll_exposures = npv_cube[:,:,1]
coll_exposures[coll_exposures < 0] = 0
coll_expected_exposures = np.sum(coll_exposures, axis=0) / NSim

plt.figure(figsize=(7, 5), dpi=300)
plt.plot(T, uncoll_expected_exposures/1000., 'r', label='Uncollateralized')
plt.plot(T, coll_expected_exposures/1000., 'b', label='Collateralized')
plt.xlabel("Time in years")
plt.ylabel("Expected Exposure in Thousands")
plt.title("Expected Exposure")
plt.legend(loc='upper left')

Now we can plot the PFE curves where we take the 95% quantile of above expected exposures.

# Calculate the PFE curve (95% quantile)
uncoll_PFE_curve = np.percentile(uncoll_exposures, 95, axis=0, interpolation='higher')
coll_PFE_curve = np.percentile(coll_exposures, 95, axis=0, interpolation='higher')
plt.figure(figsize=(7,7), dpi=300)
plt.plot(T, uncoll_PFE_curve, 'r', label='Uncollateralized')
plt.plot(T, coll_PFE_curve, 'b', label='Collateralized')
plt.xlabel("Time in years")
plt.title("PFE Curves")
plt.legend(loc='upper left')

We can now calculate the Maximum PFE for both the curves.

# calculate the maximum pfe
MPFE = np.max(uncoll_PFE_curve)
print 'Maximum Uncollateralized PFE:%f' % MPFE

MPFE = np.max(coll_PFE_curve)
print 'Maximum Collateralized PFE:%f' % MPFE

Maximum Uncollateralized PFE: 260,962.61

Maximum Collateralized PFE: 252,916.08

The spark-submit script that I have used is by no means optimized. I have tried tuning the various spark memory and number of executors parameters in a trial and error approach within a limited time frame I had. The configuration I came up with is optimal for the cluster I used  but I am sure it can be improved.

Thanks for stopping by.


Estimating Potential Future Exposure with QuantLib and AWS EMR – Part I

Counterparty risk is the risk that a party to an OTC derivatives contract may fail to perform on its contractual obligations, causing losses to the other party. Credit exposure is the actual loss in the event of a counterparty default.

Some of the ways to reduce counterparty risk :

Netting: Offset positive and negative contract values with the same counterparty reduces exposure to that counterparty

Collateral: Holding cash or securities against an exposure

Central counterparties (CCP): Use a third party clearing house as a counterparty between buyer and seller and post margin

Potential Future Exposure (PFE) is a measure of credit risk and is the worst exposure one could have to a counterparty at a certain time in future with a certain level of confidence. For example, for a PFE of $100,000 with 95% confidence, we expect to have an exposure (loss in case counterparty defaults with no recovery) greater than $100,000 in only 5% of scenarios.

Netting set is a group of OTC trades (could be interest rate swaps, FxFwds or CCS) that are facing the same counterparty. An ISDA CSA agreement with that counterparty defines how an exposure is treated within the netting set but usually we can “net” the exposures of different instruments in the set which reduces the exposure. For example, a positive exposure on a swap could be netted with a negative exposure on FxFwd.

Estimating PFE involves simulating future market risk scenarios, calculating “netted” MtM values of OTC trades that are facing the same counterparty at various dates in future at each scenario and taking only the positive MtMs which represent our exposure to counterparty,  then taking let’s say 95% quantile of the peak exposures.

For an interest rate swap, market risk factor is the underlying forward curve which determines the NPV of the floating leg. For an FxFwd, its the forward interest curves for the two currencies and the forward FX rate.

In this post, to generate future scenarios of the curves, I use Hull-White one factor short rate model which is assumed to be calibrated. There are many excellent resources available online which discuss interest rate models, PFE and QuantLib/Python in general, some of which I have used here are:

Hull White Term Structure Simulations

Expected Exposure and PFE Simulation

Derivatives CVA Calculation

The focus of this post is to provide a proof of concept of estimating PFE on Amazon Web Service’s ElasticMapReduce (EMR). AWS EMR provides a complete platform of Hadoop ecosystem tools to process vast amounts of data in dynamically scalable (hence “elastic”) environment. I will use Apache Spark hosted in EMR cluster to generate future curve simulations and perform NPV calculations on various dates for each of the scenarios. As you can see, for a realistic number of simulations, the number of calculations needed for a netting set comprised of a few swaps will easily exceed the capacity of a single machine.

Challenges and Issues:

But the devil’s always in the details, right? Here are some of the challenges I faced with this whole setup involving Amazon EMR, QuantLib and Python and ways to overcome them:

While developing my Spark PFE “job” to be submitted to EMR cluster, I needed to spin a cluster on demand. Obviously, I needed to terminate it as soon as I was done or else I will continue to be charged for it. This means that, every time I spin a new cluster, I will get fresh EC2 instances with no QuantLib or Anaconda Python installed on them. Now, installing Anaconda Python and compiling Boost and then QuantLib and then QuantLib SWIG wrappers and then making all of it work seamlessly is not for the faint of heart 🙂 But there is a way to solve the EC2 problem. AWS has something called AMI (Amazon Machine Image) which is really a base machine you can build your cluster with. So I spin up an EC2 instance, install/compile all the software I need and then save it as an AMI and use it as base machine for the cluster so that QuantLib/Python is already setup on all nodes from the get go.

However, AWS EMR requires that if you are going to use custom AMI for cluster nodes, it must be based on Amazon’s own Linux AMI. Amazon Linux AMI is loosely based on RHEL/CentOS but it’s not the same. I faced several issues in compiling Boost as well as QuantLib on this version of Linux and all of them were related to the gcc/g++ version that came with it which was really old (4.3) So I upgraded the gcc version to 6.4 and then installed Boost, QuantLib on top of it.

In a very simplified view, Spark is a distributed architecture with a master node and number of worker nodes. All the input, output and intermediate data is stored in-memory as resilient distributed datasets (RDDs). The objects are serialized and are distributed to the worker nodes in order to be processed. However, I quickly realized that QuantLib/Python SWIG objects cannot be serialized (or pickled), so we must re-create QuantLib objects like Swaps, Dates and Curves from pure Python objects on the worker nodes each time we need them.

I also wanted to incorporate collateral movements into my exposure calculations since all bilateral OTC swaps/FxFwds are required to be collateralized and wanted to see how collateral affects PFE. As it turns out, it reduces PFE but doesn’t completely eliminate it due to the margin period of risk (7 days in this post) and asymmetric collateral amounts delivered and posted due to Threshold and MTA requirements. I have referenced Jon Gregory’s Counterparty Credit Risk Second Edition book and spreadsheets for this purpose.

I have also added FxFwd (EUR/USD in this case) to the netting set as we deal with large amounts of FxFwds on a day-to-day basis. But the FxFwd example in this post is not very realistic as there is only one FxFwd that matures in 1 year and will not be replaced. Usually, FxFwds are continually rolled over (also known as TARF) in a portfolio but this one FxFwd really twists the netting set NPV while it has not matured. The way I calculated FxFwd exposure at time t is as follows:

forward points = spot rate(t) – forward rate

YF(t) = year fraction from t until maturity of FxFwd, r_dom(t) = domestic currency zero rate

FxFwd NPV(t) = ((spot rate + fwd points) * FxFwd notional)/( 1 + (r_dom(t) * YF(t)))

FxFwd Exposure(t) = (FxFwd notional * spot rate(t)) – FxFwd NPV(t)

The forward rate is the agreed upon rate and spot rate at time t was obtained from FX rate simulation using Garman-Kohlagen process.

The r_dom(t) rate was obtained from the same yield curve simulation  used for swap MtM calculations.

The main stages of our Spark job are as follows:

The Spark driver program loads the netting set instruments, USD swap curve , EUR swap curve, previous swap fixings (as the swaps are “seasoned” and have been traded a year ago) from Amazon S3 bucket.

We determine the dates we want to estimate the netting set NPV based on a weekly collateral schedule plus any swap reset dates.

We calculate today’s netting set NPV which will be the base from which the simulated NPVs will start from.

We will generate a matrix of normally distributed random numbers using Spark Machine Learning library called MLLib which will return a RDD with random numbers distributed across the cluster in the form of partitions.

The work to compute NPVs (Collateralized and Uncollateralized) will be distributed by Spark to available nodes (containers with executor threads) and will be done in parallel.

At the driver, we wait for all the work to be done and then collect the results of all NPV simulations called “NPV Cube” and write them out to a file on Amazon S3 bucket and terminate the cluster.

Another python script which runs on a local machine (not in cluster) then reads from this S3 bucket using boto3 library and visualizes/computes the final quantile calculation.

Let’s dive into the code. All the code and input files used in this post are available at Spark-PFE github repo

Here are the various input files used in this post. Obviously, the swap curves are not real but they are close to real. Libor fixings are real since the data is public domain.

usd-libor-swap-curve.csv , eur-libor-swap-curve.csv , instruments.csv, USD3MLibor-Fixings.csv

from pyspark import SparkConf, SparkContext
from pyspark.mllib.random import RandomRDDs
import QuantLib as ql
import datetime as dt
import numpy as np
import math
import sys

# Used in loading the various input text files
def is_number(s):
        return True
    except ValueError:
        return False

# QuantLib date to Python date
def ql_to_datetime(d):
    return dt.datetime(d.year(), d.month(), d.dayOfMonth())

# Python date to QuantLib date
def py_to_qldate(d):
    return ql.Date(, d.month, d.year)

# Loads libor fixings from input file into an RDD and then collects the results
def loadLiborFixings(libor_fixings_file):

    libor_fixings = sc.textFile(libor_fixings_file) \
        .map(lambda line: line.split(",")) \
        .filter(lambda r: is_number(r[1])) \
        .map(lambda line: (str(line[0]), float(line[1]))).cache()

    fixingDates = r: r[0]).collect()
    fixings = r: r[1]).collect()
    return fixingDates, fixings

# Loads input swap specifications from input file into an RDD and then collects the results
def load_swaps(instruments_file):
    swaps = sc.textFile(instruments_file) \
            .map(lambda line: line.split(",")) \
            .filter(lambda r: r[0] == 'IRS') \
            .map(lambda line: (str(line[1]), str(line[2]), float(line[3]), float(line[4]), str(line[5])))\
    return swaps

# Loads input FxFwd specifications from input file into an RDD and then collects the results
def load_fxfwds(instruments_file):
    fxfwds = sc.textFile(instruments_file) \
        .map(lambda line: line.split(",")) \
        .filter(lambda r: r[0] == 'FXFWD') \
        .map(lambda line: (str(line[1]), str(line[2]), float(line[3]), float(line[4]), str(line[5]), str(line[6])))\
    return fxfwds

# Builds a QuantLib swap object from given specification
def makeSwap(today, start, maturity, nominal, fixedRate, index, typ=ql.VanillaSwap.Payer):
    calendar = ql.UnitedStates()
    fixedLegTenor = ql.Period(6, ql.Months)
    floatingLegBDC = ql.ModifiedFollowing
    fixedLegDC = ql.Thirty360(ql.Thirty360.BondBasis)
    spread = 0.0
    settle_date = calendar.advance(start, 2, ql.Days)
    end = calendar.advance(settle_date, maturity, floatingLegBDC)

    fixedSchedule = ql.Schedule(settle_date,
                                ql.ModifiedFollowing, ql.ModifiedFollowing,
                                ql.DateGeneration.Forward, False)
    floatSchedule = ql.Schedule(settle_date,
    swap = ql.VanillaSwap(typ,
    return swap, [index.fixingDate(x) for x in floatSchedule if index.fixingDate(x) >= today][:-1]

Below is the main method invoked by the spark-submit driver program.

# main method invoked by Spark driver program
def main(sc, args_dict):
    # Broadcast dictionary obejct, which will hold various pure python objects
    # needed by the executors
    # QuantLib SWIG wrappers cannot be pickled and sent to the executotrs
    # so we use broadcast variables in Spark to pass pure python obejcts
    broadcast_dict = {}
    pytoday = dt.datetime(2017, 9, 8)
    broadcast_dict['python_today'] = pytoday
    today = ql.Date(, pytoday.month,pytoday.year)
    ql.Settings.instance().evaluationDate = today

    usd_calendar = ql.UnitedStates()
    usd_dc = ql.Actual365Fixed()
    eurusd_fx_spot = ql.SimpleQuote(1.1856)
    broadcast_dict['eurusd_fx_spot'] = eurusd_fx_spot.value()
    output_dir = args_dict['output_dir']
    instruments_file = args_dict['instruments_file']
    libor_fixings_file = args_dict['libor_fixings_file']

    # Loads USD libor swap curve from input file
    usd_swap_curve = sc.textFile(args_dict['usd_swap_curve_file']) \
        .map(lambda line: line.split(",")) \
        .filter(lambda r: is_number(r[1])) \
        .map(lambda line: (str(line[0]), float(line[1]))) \
    usd_curve_dates = r: r[0]).collect()
    usd_disc_factors = r: r[1]).collect()
    broadcast_dict['usd_curve_dates'] = usd_curve_dates
    broadcast_dict['usd_disc_factors'] = usd_disc_factors
    usd_crv_today = ql.DiscountCurve([ql.DateParser.parseFormatted(x,'%Y-%m-%d')
                                      for x in usd_curve_dates] , usd_disc_factors, usd_dc, usd_calendar )
    usd_disc_term_structure = ql.RelinkableYieldTermStructureHandle(usd_crv_today)

    # Loads EUR libor swap curve from input file
    eur_swap_curve = sc.textFile(args_dict['eur_swap_curve_file']) \
        .map(lambda line: line.split(",")) \
        .filter(lambda r: is_number(r[1])) \
        .map(lambda line: (str(line[0]), float(line[1]))) \
    eur_curve_dates = r: r[0]).collect()
    eur_disc_factors = r: r[1]).collect()
    broadcast_dict['eur_curve_dates'] = eur_curve_dates
    broadcast_dict['eur_disc_factors'] = eur_disc_factors

    # Build QuantLib index object
    usdlibor3m = ql.USDLibor(ql.Period(3, ql.Months), usd_disc_term_structure)
    # don't need EUR fixings since we don't have a EUR swap
    fixingDates, fixings = loadLiborFixings(libor_fixings_file)
    fixingDates = [ql.DateParser.parseFormatted(r, '%Y-%m-%d') for r in fixingDates]
    three_month_old_date = usd_calendar.advance(today, -90, ql.Days, ql.ModifiedFollowing)
    latestFixingDates = fixingDates[fixingDates.index(three_month_old_date):]
    latestFixings = fixings[fixingDates.index(three_month_old_date):]
    usdlibor3m.addFixings(latestFixingDates, latestFixings)
    broadcast_dict['fixing_dates'] = [ql_to_datetime(x) for x in latestFixingDates]
    broadcast_dict['fixings'] = latestFixings

    swaps = load_swaps(instruments_file)
    broadcast_dict['swaps'] = swaps
    fxfwds = load_fxfwds(instruments_file)
    broadcast_dict['fxfwds'] = fxfwds

    swaps = [
                makeSwap(today, ql.DateParser.parseFormatted(swap[0], '%Y-%m-%d'),
                     ql.Period(swap[1]), swap[2], swap[3], usdlibor3m,
                     ql.VanillaSwap.Payer if swap[4] == 'Payer' else ql.VanillaSwap.Receiver)
                for swap in swaps

    longest_swap_maturity = max([s[0].maturityDate() for s in swaps])
    broadcast_dict['longest_swap_maturity'] = ql_to_datetime(longest_swap_maturity)

    Nsim = int(args_dict['NSim'])
    NPartitions = int(args_dict['NPartitions'])
    a = float(args_dict['a']) #0.376739
    sigma = float(args_dict['sigma']) #0.0209835
    broadcast_dict['a'] = a
    broadcast_dict['sigma'] = sigma

    # Simulate swap NPVs until we reach the longest maturity
    years_to_sim = math.ceil(ql.Actual360().yearFraction(today,longest_swap_maturity))

    dates = [today + ql.Period(i, ql.Weeks) for i in range(0, 52 * int(years_to_sim))]
    # Very important to add swap reset dates to our universe of dates
    for idx in xrange(len(swaps)):
        dates += swaps[idx][1]
    dates = np.unique(np.sort(dates))
    broadcast_dict['dates'] = [ql_to_datetime(x) for x in dates]

    br_dict = sc.broadcast(broadcast_dict)

    # Write out the time grid to a text file which can be parsed later
    T = [ql.Actual360().yearFraction(today, dates[i]) for i in xrange(1, len(dates))]
    temp_rdd = sc.parallelize(T)
    # coalesce with shrink the partition size to 1 so we have only 1 file to write and parse later
    temp_rdd.coalesce(1).saveAsTextFile(output_dir +'/time-grid')

    # the main routine, generate a matrix of normally distributed random numbers and
    # spread them across the cluster
    # we are not setting the number of partitions here as the default parallelism should be enough
    randArrayRDD = RandomRDDs.normalVectorRDD(sc, Nsim, len(T), seed=1L)

    # for each row of the matrix, which corresponds to one simulated path,
    # compute netting set NPV (collateralized and uncollateralized)
    npv_cube = p: (calc_exposure(p,T,br_dict)))
    # write out the npv cube, remove '[' and ']' added by numpy array to ease parsing later
    npv_cube.coalesce(1).saveAsTextFile(output_dir + '/npv_cube')

Below is the method that will be executed on the executors in parallel on the EMR cluster.

def calc_exposure(rnumbers, time_grid, br_dict):

    # get hold of broadcasted dictionary and rebuild the curves, libor index, swaps and FxFwd
    # we need to do this since QuantLib SWIG objects cannot be passed around

    broadcast_dict = br_dict.value
    today = py_to_qldate(broadcast_dict['python_today'])
    usd_calendar = ql.UnitedStates()
    usd_dc = ql.Actual365Fixed()
    eur_calendar = ql.TARGET()
    eur_dc = ql.ActualActual()

    maturity = 10
    a = broadcast_dict['a']
    sigma = broadcast_dict['sigma']
    dates = [py_to_qldate(x) for x in broadcast_dict['dates']]
    longest_swap_maturity = py_to_qldate(broadcast_dict['longest_swap_maturity'])
    fixing_dates = [py_to_qldate(x) for x in broadcast_dict['fixing_dates']]
    fixings = broadcast_dict['fixings']

    usd_curve_dates = broadcast_dict['usd_curve_dates']
    usd_disc_factors = broadcast_dict['usd_disc_factors']
    usd_t0_curve = ql.DiscountCurve([ql.DateParser.parseFormatted(x, '%Y-%m-%d')
                        for x in usd_curve_dates], usd_disc_factors, usd_dc, usd_calendar)
    usd_disc_term_structure = ql.RelinkableYieldTermStructureHandle(usd_t0_curve)

    eur_curve_dates = broadcast_dict['eur_curve_dates']
    eur_disc_factors = broadcast_dict['eur_disc_factors']
    eur_t0_curve = ql.DiscountCurve([ql.DateParser.parseFormatted(x, '%Y-%m-%d')
                                     for x in eur_curve_dates], eur_disc_factors, eur_dc, eur_calendar)
    eur_disc_term_structure = ql.RelinkableYieldTermStructureHandle(eur_t0_curve)

    eurusd_fx_spot = broadcast_dict['eurusd_fx_spot']
    flat_vol_hyts = ql.BlackVolTermStructureHandle(ql.BlackConstantVol(today, usd_calendar, 0.20, usd_dc))
    usdlibor3m = ql.USDLibor(ql.Period(3, ql.Months), usd_disc_term_structure)
    usdlibor3m.addFixings(fixing_dates, fixings)
    engine = ql.DiscountingSwapEngine(usd_disc_term_structure)
    swaps = broadcast_dict['swaps']
    swaps = [
        makeSwap(today, ql.DateParser.parseFormatted(swap[0], '%Y-%m-%d'),
                 ql.Period(swap[1]), swap[2], swap[3], usdlibor3m,
                 ql.VanillaSwap.Payer if swap[4] == 'Payer' else ql.VanillaSwap.Receiver)
        for swap in swaps

    # I know there is only one fxfwd in the instruments file
    fxfwds = broadcast_dict['fxfwds']
    fx_startdate = ql.DateParser.parseFormatted(fxfwds[0][0], '%Y-%m-%d')
    fx_tenor = ql.Period(fxfwds[0][1])
    fx_notional = fxfwds[0][2]
    fwd_rate = fxfwds[0][3]
    fx_maturity = usd_calendar.advance(fx_startdate, fx_tenor, ql.Following)

    # define the NPV cube array
    # number of rows = number of dates in time-grid
    # number of columns = 2 (collateralized and uncollateralized NPV)
    npvMat = np.zeros((len(time_grid),2))

    # utility method to calc FxFwd exposure
    def fxfwd_exposure(date, spot, usd_curve):
        usd_zero_rate = usd_curve.zeroRate(usd_dc.yearFraction(date, fx_maturity),
                                           ql.Compounded, ql.Annual).rate()
        yf = usd_dc.yearFraction(date, fx_maturity)
        fwd_points = fwd_rate - spot
        fxfwd_npv = ((spot + fwd_points) * fx_notional) / (1 + (usd_zero_rate * yf))
        fxfwd_exp = (fx_notional * eurusd_fx_spot) - fxfwd_npv
        return fxfwd_exp

    # Intialize NPV cube with today's NPVs
    nettingset_npv = 0.
    for idx in xrange(len(swaps)):
        nettingset_npv += swaps[idx][0].NPV()
    fxfwd_exp = fxfwd_exposure(today, eurusd_fx_spot, usd_t0_curve)
    nettingset_npv += fxfwd_exp
    npvMat[0] = nettingset_npv

    # Hull White parameter estimations
    def gamma(t):
        forwardRate = usd_t0_curve.forwardRate(t, t, ql.Continuous, ql.NoFrequency).rate()
        temp = sigma * (1.0 - np.exp(- a * t)) / a
        return forwardRate + 0.5 * temp * temp

    def B(t, T):
        return (1.0 - np.exp(- a * (T - t))) / a

    def A(t, T):
        forward = usd_t0_curve.forwardRate(t, t, ql.Continuous, ql.NoFrequency).rate()
        value = B(t, T) * forward - 0.25 * sigma * B(t, T) * sigma * B(t, T) * B(0.0, 2.0 * t)
        return np.exp(value) * /

    usd_rmat = np.zeros(shape=(len(time_grid)))
    usd_rmat[:] = usd_t0_curve.forwardRate(0,0, ql.Continuous, ql.NoFrequency).rate()

    eur_rmat = np.zeros(shape=(len(time_grid)))
    eur_rmat[:] = eur_t0_curve.forwardRate(0, 0, ql.Continuous, ql.NoFrequency).rate()

    spotmat = np.zeros(shape=(len(time_grid)))
    spotmat[:] = eurusd_fx_spot

    # CSA terms for Collateral movements
    IA1 = 0; IA2 = 0; threshold1 = 50000.0; threshold2 = 50000.0; MTA1 = 25000.0; MTA2 = 25000;rounding = 5000.0
    collateral_held = IA2 - IA1
    collateral_posted = 0.0

    # the main loop of NPV computations
    for iT in xrange(1, len(time_grid)):
        mean = usd_rmat[iT - 1] * np.exp(- a * (time_grid[iT] - time_grid[iT - 1])) + \
               gamma(time_grid[iT]) - gamma(time_grid[iT - 1]) * \
                                        np.exp(- a * (time_grid[iT] - time_grid[iT - 1]))

        var = 0.5 * sigma * sigma / a * (1 - np.exp(-2 * a * (time_grid[iT] - time_grid[iT - 1])))
        rnew = mean + rnumbers[iT-1] * np.sqrt(var)
        usd_rmat[iT] = rnew
        # USD discount factors as generated by HW model
        usd_disc_factors = [1.0] + [A(time_grid[iT], time_grid[iT] + k) *
                                np.exp(- B(time_grid[iT], time_grid[iT] + k) * rnew) for k in xrange(1, maturity + 1)]

        mean = eur_rmat[iT - 1] * np.exp(- a * (time_grid[iT] - time_grid[iT - 1])) + \
               gamma(time_grid[iT]) - gamma(time_grid[iT - 1]) * \
                                      np.exp(- a * (time_grid[iT] - time_grid[iT - 1]))

        var = 0.5 * sigma * sigma / a * (1 - np.exp(-2 * a * (time_grid[iT] - time_grid[iT - 1])))
        rnew = mean + rnumbers[iT - 1] * np.sqrt(var)
        eur_rmat[iT] = rnew
        # EUR discount factors as generated by HW model
        eur_disc_factors = [1.0] + [A(time_grid[iT], time_grid[iT] + k) *
                                    np.exp(- B(time_grid[iT], time_grid[iT] + k) * rnew) for k in xrange(1, maturity + 1)]

        if dates[iT] > longest_swap_maturity:

        # very important to reset the valuation date
        crv_date = dates[iT]
        crv_dates = [crv_date] + [crv_date + ql.Period(k, ql.Years) for k in xrange(1, maturity + 1)]
        # use the new disc factors to build a new simulated curve
        usd_crv = ql.DiscountCurve(crv_dates, usd_disc_factors, ql.Actual365Fixed(), ql.UnitedStates())
        eur_crv = ql.DiscountCurve(crv_dates, eur_disc_factors, ql.ActualActual(), ql.TARGET())
        usdlibor3m.addFixings(fixing_dates, fixings)
        swap_engine = ql.DiscountingSwapEngine(usd_disc_term_structure)

        # build Garman-Kohlagen process for FX rate simulation for FxFwd
        gk_process = ql.GarmanKohlagenProcess(ql.QuoteHandle(ql.SimpleQuote(eurusd_fx_spot)),
                                              usd_disc_term_structure, eur_disc_term_structure, flat_vol_hyts)
        dt = time_grid[iT] - time_grid[iT - 1]
        spotmat[iT] = gk_process.evolve(time_grid[iT - 1], spotmat[iT - 1], dt, rnumbers[iT - 1])
        nettingset_npv = 0.
        for s in range(len(swaps)):
            if usdlibor3m.isValidFixingDate(dates[iT]):
                fixing = usdlibor3m.fixing(dates[iT])
                usdlibor3m.addFixing(dates[iT], fixing)
            nettingset_npv += swaps[s][0].NPV()

        if dates[iT] <= fx_maturity:
            fxfwd_exp = fxfwd_exposure(dates[iT], spotmat[iT], usd_crv)
            nettingset_npv += fxfwd_exp

        # we have uncollateralized netting set NPV
        npvMat[iT,0] = nettingset_npv
        prev_exposure = npvMat[iT-1,1]
        print 'prev_exposure:%f' % prev_exposure
        collateral_held = collateral_held + collateral_posted
        print 'collateral_held:%f' % collateral_held
        nettingset_npv = nettingset_npv + IA2 - IA1
        print 'nettingset_npv:%f' % nettingset_npv
        collateral_required = max(nettingset_npv - threshold2, 0) \
                              - max(-nettingset_npv - threshold1, 0) - collateral_held
        print 'collateral_required:%f' % collateral_required
        collateral_posted = collateral_required
        if collateral_posted > 0:
            if collateral_posted < MTA2:
                collateral_posted = 0.
        elif -collateral_posted < MTA1:
            collateral_posted = 0.
        print 'collateral_posted:%f' % collateral_posted
        if collateral_posted <> 0. and rounding <> 0.:
            collateral_posted = math.ceil(collateral_posted/rounding) * rounding
        print 'collateral_posted after rounding:%f' % collateral_posted
        if collateral_posted < 0:
            collateral_held = collateral_held + collateral_posted
            collateral_posted = 0.
        print 'collateral_held:%f' % collateral_held
        # we have collateralized netting set NPV
        npvMat[iT,1] = nettingset_npv - collateral_held
        print 'npvMat:%f' % npvMat[iT,1]
        print '========================================='
    return npvMat

Below is the driver program initialization and argument parsing.

if __name__ == "__main__":

    if len(sys.argv) != 10:
        print('Usage: ' + sys.argv[0] + ' <num_of_simulations> <num_of_partitions> <hull_white_a> <hull_white_sigma> <usd_swap_curve><eur_swap_curve><libor_fixings><instruments><output_dir>')
    conf = SparkConf().setAppName("SPARK-PFE")
    sc  = SparkContext(conf=conf)

    args = {'NSim':sys.argv[1], 'NPartitions': sys.argv[2], 'a': sys.argv[3], 'sigma':sys.argv[4],
            'usd_swap_curve_file': sys.argv[5], 'eur_swap_curve_file': sys.argv[6], 'libor_fixings_file': sys.argv[7],
            'instruments_file': sys.argv[8], 'output_dir': sys.argv[9]}

    main(sc, args)

The spark-submit script used to submit this job looks as follows:

PYSPARK_DRIVER_PYTHON=/opt/anaconda/anaconda2/bin/python \
PYSPARK_PYTHON=/opt/anaconda/anaconda2/bin/python \
spark-submit \
--deploy-mode cluster \
--master yarn \
--conf spark.driver.extraLibraryPath="${LD_LIBRARY_PATH}" \
--conf spark.executorEnv.LD_LIBRARY_PATH="${LD_LIBRARY_PATH}"  \
--num-executors 14 \
--conf spark.yarn.executor.memoryOverhead=5120 \
--executor-memory 43G \
--conf spark.yarn.driver.memoryOverhead=5120 \
--driver-memory 43G \
--executor-cores 12 \
--driver-cores 3 \
--conf spark.default.parallelism=168 \ 5000 48 0.376739 0.0209835 \
s3://spark-pfe/usd-libor-swap-curve.csv \
s3://spark-pfe/eur-libor-swap-curve.csv \
s3://spark-pfe/USD3MLibor-Fixings.csv \
s3://spark-pfe/instruments.csv \

After the spark job completes, the output files which are time-grid array and NPV cube are stored in an S3 bucket. We will use another Python script to pull the files and visualize the PFE which will be the subject of Part II of this post.