Java Application Performance

JIT Compilation and CodeCache

JVM interprets byte code and JIT compilation compiles frequently run code to machine code. To find which code block or method is JIT compiled, flag -XX:+PrintCompilation can be used.

java -XX:+PrintCompilation Main 5000

First column is number of milliseconds since VM started. Second is the order in which code block/method was compiled (some parts took longer than others) Third columns can have char values : n which means native method, s for synchronized method, ! for exception handling, % means the code has been natively compiled and is put in code cache (most optimal). The last number shows the level of compilation.

C1 compiler can produce native levels 1, 2 and 3 in the above snapshot (each progressively more complex) and C2 compiler produces level 4 and puts in code cache. VM profiles the code and selects appropriate level.

If you wanted to log the PrintCompilation output, in case you do not have access to the remote machine where you can see the console, use the following command :

java -XX:+UnlockDiagnosticVMOptions -XX:+LogCompilation Main 5000

To get the size of CodeCache, use PrintCodeCache JVM flag. Currently it is set to 120 MB as shown below.

In 64-bit, Java 8, the CodeCache size can be up to 240 MB. To change the CodeCache size,

InitialCodeCacheSize – when the application starts

ReservedCodeCacheSize – Max Size of CodeCache

CodeCacheExpansionSize – how quickly CodeCache should grow

You can also use JConsole to check the CodeCacheSize at run time but keep in mind that using JConsole adds roughly 2 MB of additional CodeCache which is needed for JVM to communicate with JConsole.

Selecting JVM

On 64-bit OS, you can choose 32-bit java compiler or 64-bit java compiler.

You choose 32-bit compiler at runtime using “-client” or “-server” flag and “-d64” for 64-bit server compiler. However, these flags do not always work as expected on different OSs.

To turn off Tiered Compilation, use -XX:-TieredCompilation flag although there is never any good reason to do so.

To see how many threads are available for native compilation of the code, use jinfo as follows:

To change the number of threads available for native compilation

java -XX:CICompilerCount=6 -XX:+PrintCompilation Main 5000

To control how many times a method must run before JVM decides to compile it, use


The Stack and the Heap

Stack – only local primitives, Heap – objects including Strings. Every thread in Java application has its own stack, but multiple threads share a heap. Pointer to the object in the heap is stored on the stack.

For objects passed into the methods, the REFERENCE to the object is passed BY VALUE. Primitives are passed by value, a copy of the variable is made on the stack and that copy is passed to the method.

Final keyword – the object that the reference points to cannot be changed (or re-assigned), but the object properties can be changed.

final Customer c = new Customer("Susan");
c = new Customer("Danny"); // not allowed
c.setName("Danny"); //allowed

Escaping References

When you return a collection from a getter method to the caller, that reference has escaped. The caller could call clear() on the collection which was not the intent in returning that reference.

To fix this,

  1. Have the class implement Iterable interface and provide iterator() method. The caller can then loop thru e.g. Customer records. Still possible to delete Customer objects in the collection but difficult.
  2. Return a copy of the collection in the getter e.g. return new ArrayList(myList) – involves copying large collections
  3. Return Collections.unmodifiableList(myList) in Java 11+ or List.copyof(myList) in Java 8- best solution

If you are returning a Custom object (e.g. Customer), have a Copy constructor on the object so that get will return a copy. But the caller will not know if they are modifying a copy or the original which could be misleading. Better way – have Customer implement ReadonlyCustomer interface without setName method and return a ReadonlyCustomer interface from findCustomer(), so the caller will compile time error on setName.

The Metaspace (java 8 and above)

Other than Stacks and Heap, JVM maintains an area called Metaspace. It holds class metadata information like which methods are natively compiled and it holds all Static variables (primitives and references).

There used to be PermGen space in Java 7 and below which has been removed in Java 8. PermGen used to run out of memory, metaspace cannot run out of memory.

The String Pool

Small strings are put into a pool by JVM, so the behavior is as follows

String one = "hello"; String two = "hello";
one.equals(two); // always true
one == two; // true 

However, strings which are derived as a result of instruction are not put into pool

String three = new Integer(76).toString();
String four = "76";
three == four; // not true

But we can use intern() method to ask JVM to put the string in the pool.

String pool is a Hashmap, strings are stored by their hashcodes. To get initial size of buckets in the hashmap,

use -XX:+PrintStringTableStatistics flag as follows:

There are already 65,536 buckets with 1,732 strings in the pool, these are all code java library strings.

To set the size of string pool, use XX:StringTableSize=large_prime_number

Setting up the heap Size

-XX:MaxheapSize=600m or -Xmx600m => for setting 600 MB max heap

-XX:InitialHeapSize=1g or -Xms1g => for setting initial heap to 1GB

Monitoring the Heap

Java VisualVM – to monitor the heap over time and see garbage collection cycles

To see which objects are occupying the heap, use Memory Analyzer (MAT)

Generational Garbage Collection

Removing objects from the heap which are no longer reachable

General mechanism – mark and sweep – Instead of looking for all the objects to remove, GC looks for all objects to retain and rescues them. marking – program execution stopped, all threads paused, check all variables on stack and metaspace and follow its reference, all reachable objects are alive. sweeping – any objects not marked during the marking phase can be freed up and live objects are moved to a contiguous space of memory to avoid defragmentation.

Since GC is not really looking for garbage but for live objects, the more garbage there is, faster the collection, nothing to mark.

Most objects don’t live for long. If an object survives one GC, it is likely to live for a long time. Heap is organized to 2 sections – young generation and old generation. The young generation is smaller than old and is further divided into 3 sections. The process to mark young generation should be really quick as size is smaller and application freeze is not noticed. Surviving objects are moved to old generation and new objects added to young. GC of young generation is known as “minor collection”, a running app will have many minor collections and few major (old) collections.

Java 8 – Young generation split into Eden, S0 and S1 survivor spaces. When app starts, all 3 spaces are empty. Object created placed in Eden, when it gets full which happens quickly, GC runs Eden space, any surviving objects moved to either S0 or S1. Next time, the GC looks in Eden and either S0 or S1, and moves surviving objects to either S1 or S0. An object has been swapped between S0 and S1 for example 5 times, is 5 generations old. After certain threshold, the object is moved to Old generation.

To monitor when GC is taking place, use -verbose:gc

Tuning Garbage Collection

Young generation garbage collection is preferred over Old generation garbage collection as there is no puase.


The meaning of this flag – how many times bigger the old gen be compared to new gen ? if n= 4, old gen is 4 times bigger than young, if heap is 10 MB, old gen 8 MB and young will be 2 MB. To see the default value, find the process id first and then “jinfo -flag NewRatio PID” , usually n = 2 by default. To increase the size of young gen and reduce old gen, n has to be lower, must be a whole number, so the only choice is n = 1, divide old and young equally.


The meaning of this flag – how much of the young generation should be taken by Survivor spaces S0 and S1 , the rest is Eden space. Default value using jinfo is 8 which means both S0 and S1 be 1/8th (12.5%) of the young gen and Eden space will be 6/8 or 3/4 (75%). If we reduce this value to 5, S0 and S1 both will be 1/5th of young gen.


The meaning – How many times the object needs to survive before it becomes part of the old generation? We want objects to live in young gen as long as possible. Default is 15 which is the Max value for this flag.

Selecting a garbage collector

default collector changed in java 9 and above. In java 8, 3 types of collectors :

Serial – uses single thread for all garbage collection, use -XX:+UseSerialGC

Parallel – uses threads for Minor (young gen) collection, use -XX:+UseParallelGC, this is the default GC for java 8 and below

Mostly Concurrent – closest thing to real time GC, application paused during the mark phase but not during sweep phase. There are 2 types of concurrent GC :

MarkSweep Collector – default in Java 9, -XX:+UseConcMarkSweepGC

G1 Collector – default from java 10, use -XX:+UseG1GC. G1 GC works very differently. Heap is split into 2048 regions, some regions are allocated to different parts of heap (S0, S1, Eden and Old) After each minor GC, number of regions allocated to each part of young gen are re-allocated to what java thinks is the optimal. So S0 region might get allocated to Eden, prev unallocated might get allocated to Eden etc. During the full GC, each Old region is looked for mostly garbage and collect garbage from those regions first, that’s why its called G1, garbage first collector. Just clearing a few old regions might be enough instead of full GC, so performance of G1 should be better.

You should not have to tune G1 collector but here are the flags

-XX:ConcGCThreads=n – the number of threads available for smaller regional collections

-XX:InitiatingHeapOccupancyPercent=n – default 45%, G1 runs when the entire heap (old, Survivor and Eden combined) is 45% full

-XX:UseStringDeDuplication (only for G1 collector) – allows collector to make more space if it finds duplicate strings in heap

Java Mission Control

Using a profiler – is the cpu usage too high? network or disk usgae? To find out whats going on in the JVM, use Profiler application. JProfiler and YourKit are commercial profilers, JMC is open source.

MBean Server – is same JMX Console mode, shows live metrics

Flight recorder – historical performance, or run up to application crashing

Live Set + Fragmentation – important dial – shows the size of the heap after GC , if close to 100% indicates memory leak

To use flight recorder, for Oracle JDK, -XX:+UnlockCommercialFeatures -XX:+FlightRecorder, for OpenJDK, only use -XX:+FlightRecorder

To start flight recording on cmd line :


Start recording 2 mins after app starts, record for 60 seconds.

Assessing Performance

Using time taken to execute a piece of code (called Microbenchmark – a single method or code block) has complications:

Native compilation – comparing 2 versions of code with 1 being native compiled, false results

Garbage collection – takes place while our code is running

Assessing in isolation – the benchmarked code is not running as part of project competition for resources

Different hardware – benchmarked on dev machine, different hardware on prod machine

system.currentTimeMillis() – to measure the difference

Add warm-up loop calling the same method many times so native compilation takes place before you start micro benchmarking, then add -XX:+PrintCompilation flag to make sure method was native compiled in the warm-up time.

Use -XX:CompileThreshold=1000 so that method gets natively compiled faster in the warmup time, default is 10000

Java Microbenchmark Harness (JMH) – sets up warm up time and analyzes perf in a more production like env, runs code 1000s of times to produce summary output

Add @Benchmark annotation on top of the method you want to measure, mvn clean install jmh source code and run benchmarks.jar runnable jar.

Adding “-bm avgt” gives average time to run the benchmarked code

Lists in Java

There are 8 different types of Lists in java

AttributeList – available only in MBean objects, not generic list

RoleList,RoleUnresolvedList – only in Role objects, not generic type

CopyOnWriteArrayList – efficient when not many mutations to list, only traversal. Used in multi-threaded application, multiple threads accessing the same list, lots of iterations and reads, very few writes/additiona/removal

ArrayList – backed by array, initial size is zero but internal storage of 10 has been allocated on heap. The size grows by current_size + (current_size >> 1)

Vector – since Java 1, to ensure backwards compatibility, vector is thread-safe, comes at perf cost

Stack – child of Vector, LIFO, use LinkedList instead

LinkedList – implements List and Deque (double ended queue) interfaces, has pointers to the prev and next nodes

Adding an item to the end of list – arraylist might need resizing, linkedlist – will always be faster, java maintains the last item in the LL so can go straight to the end

Adding item to start of list – LL will be quick, not for arraylist – all items need to be moved to the right

Removing item – arraylist – all items that come after the item need to shifted to left, LL – change the pointers on either side of that item but first we have to find that item in the LL, get() is faster in arraylist than LL

Maps in Java

It takes the same amount of time to retrieve an item from a hashmap of 10 items or a hashmap of billion items.

When you create a hashmap, initial array of size 16

The Key is always converted to an integer value by using has code value

bucket = hashcode % size of map (here % is modulo)

System.out.println(“Little Women”.hashcode()) //675377748

675377748 % 16 = 4

So the object will be stored in bucket# 4.

There could be many objects stored in the same bucket. The bucket contains a linkedlist of objects, a new object is added to end of the linkedlist.

Hashmaps grow by a factor, default of 0.75 or 3/4. Once 3/4 of the buckets have one or more element in it, the hashmap is considered to be getting full and it will double its size. When HM grows all the items need to be reevaluated, new bucket numbers calculated, significant overhead.

Specify initial size and factor when creating the HM

Rules for Hashcode for custom objects – should have good range of numbers so that objects get placed in different buckets, equals objects must have equal hashcodes

When we iterate thru hashmap, we get results in random order. In LinkedHashMap, we get items back in the same order they were added. LinkedHashMap – order of the items is preserved, it uses an additional linked list across the buckets that preserves the order.


To use GraalVM compiler with OpenJDK11 (linux only)




Distributed QuantLib using AWS Lambda

Here I present a proof of concept for running QuantLib functions in AWS Lambda.

AWS Lambda offers an exciting way to leverage distributed computing without worrying about infrastructure or server provisioning, all you need to do is upload your Lambda function and trigger it using one of the supported triggers. It automatically scales to the size of your workload and you only pay for the amount of time your code was running in Lambda down to a 100 ms interval.

Your AWS Lambda function runs in Amazon’s custom Linux environment which is available as a machine image called Amazon Linux AMI. You must first compile QuantLib in Amazon’s Linux environment. You can either spin up an EC2 instance to do that or you can download and host the VirtualBox image of the Amazon’s Linux environment. I went the second route. Out of the box, Amazon Linux is pretty bare bones and you need to install “Development Tools” package and upgrade python to 3.7 From then on, its the compile QuantLib as usual and create python bindings.

When you upload your Lambda function to AWS, you have two choices when it comes to your dependencies. Either you can package them along with your function or you can use Lambda Layers. The advantage of using Layers is that you can abstract away your dependencies into a package separately and keep reusing the layer for your functions. It also reduces your function package size as there are limits on the size (250 MB). Keep in mind that, the layers are zip files and will be unzipped by AWS environment before running your function. Since QuantLib compiles into .so files, we need to make sure that the files will be unzipped into the correct path on the Lambda instance where Lambda runtime can find them.

I have already gone through this exercise and have created Layers and have made them public as shown in the table below. You will need to reference those ARN (amazon resource name) if you want to add these layers to your Lambdas. I have also used a publicly available Numpy Layer from KLayer in my Lambda functions to provide numpy functionality.


If you are developing Python 3.7 Lambda function, you will need to include both the python layers from the above table.

Now comes the fun part of writing the Lambda functions. For this demo, I am going to borrow from Mikael Katajamäki’s excellent blog at Exposure Simulation which shows swap exposure simulations for computing expected positive exposure using Hull-White One Factor model. On my laptop, the script takes about 7 and a half minutes to compute swap NPVs for 500 paths and 262 dates for one swap. I have re-engineered the original code so that I have one “worker” function that takes in a path and computes swap NPVs over the date grid, the time step for simulations is 1 week. There is one “controller” Lambda that generates the 500 random paths based on QuantLib HullWhiteProcess and then calls worker Lambdas asynchronously. There is a local script running on my laptop that uses boto3 library to call the controller Lambda to kick off the entire workflow. Each worker Lambda writes the NPVs it has calculated to a file in an S3 bucket that has been specified as environment variable to the Lambda. For the purposes of this demo, the local script keeps polling that S3 bucket to see how many files are available, once we have 500 files (as there are 500 paths) , the script assumes all the work is done and aggregates the results. I am sure there are better ways to signal when all worker Lambda have finished, probably using DynamoDB and atomic counters or using SQS/SNS queues.

To aid in passing required parameters from controller to worker lambda, I save the intermediate data to a file on S3. For example, dates and discount factors needed to build market term structure are loaded in from S3 bucket in both Lambdas. The simulated fixings generated in Controller are saved to S3 and then read by worker Lambdas. The location and file names in S3 are specified as environment variables.

All the code is available at

Here are some relevant portions of the controller Lambda function:

nPaths = 500
process = HullWhiteProcess(YieldTermStructureHandle(marketCurve), a, sigma)
#Generate paths, resulting array dimension: n, len(timeGrid)
timeGrid = grid.GetTimeGrid()
sequenceGenerator = UniformRandomSequenceGenerator(len(timeGrid), UniformRandomGenerator())
gaussianSequenceGenerator = GaussianRandomSequenceGenerator(sequenceGenerator)
maturity = timeGrid[len(timeGrid) - 1]
pathGenerator = GaussianPathGenerator(process, maturity, len(timeGrid), gaussianSequenceGenerator, False)
paths = Numpy.zeros(shape=(nPaths, len(timeGrid)))
for i in range(nPaths):
   path =
   paths[i, :] = Numpy.array([path[j] for j in range(len(timeGrid))])

forecastingCurve = RelinkableYieldTermStructureHandle()
index = USDLibor(Period(3, Months), forecastingCurve)
transaction = CreateSwapTransaction(index)

Here is how Controller Lambda calls Worker Lambdas:

for s in range(nPaths):
   print('Simulation # : {}'.format(s))
   sim_event = dict([("simulation_num", s),
                     ("first_index_fixing", firstIndexFixing),
                     ("a", a),
                     ("sigma", sigma),
                     ("settlement_date", ql_to_pydate(settlementDate).strftime('%m/%d/%Y')),
                     ("end_date", ql_to_pydate(endDate).strftime('%m/%d/%Y')),
                     ("grid_step_period", "1W"),
                     ("one_path", paths[s, :].tolist())])
    payload = json.dumps(sim_event)
# Call to another Lambda function async


Using AWS Lambda cold-start, the time taken to compute swap NPVs for 500 paths and 262 dates is about 2 minutes compared to 7 and half minutes on my laptop. After kicking off the controller Lambda, I wait for a minute for the output s3 bucket to have some files, this can be optimized by using another signalling mechanism like queues. The Lambdas themselves can be warmed up before hand so that dependencies are loaded and they are ready to go which could save additional time. In the end, I think AWS ecosystem presents an interesting alternative to multi-threading or GPU based computing solutions.


Estimating Potential Future Exposure with QuantLib and AWS EMR – Part II

In my previous post, we saw how to submit a Pyspark job to AWS EMR cluster. In this post, I will go over the setup of the cluster.

Before we start with the cluster, we must have a certificate keypair (.pem file) and a security group setup in AWS. There are many resources available online for this and I will not go into the details.

Next is selecting the custom AMI that I mentioned in my previous post as a base machine for all nodes in the cluster so that Anaconda Python and QuantLib are already installed on the nodes.  The AMI that I used is publicly available for now, it costs me about $5 per month to keep it alive. The details of the AMI are as shown below.

AMI ID ami-c56a94bf

I selected 1 m3xlarge (4 cores, 15 GB) machine for my master node and 3 r4x8large (16 cores, 244 GB) machines for my worker nodes. I also always select spot pricing for all my nodes.

Once the cluster is up and running (in waiting mode), first you need to secure copy file (available at my github repo spark-pfe github repo) to the master node and then you can SSH into master node to run the job using spark-submit script from previous post.

It took about 7 minutes for Pyspark job to complete, it computed netting set NPV for 5000 simulations across future 454 dates for 2 swaps and 1 FxFwd. The output of the simulation was written to S3 bucket and now its time to pull it onto local machine for analysis. Loading the NPV cube on a local machine is fine for demonstration purposes as we have only 5000 simulations but I would load the NPV cube into Amazon Redshift or AuroraDB for production purposes.

We will use boto3 library for downloading the files from S3.

from pylab import *
from boto3.session import Session
from io import StringIO

NSim = 5000 # I know the number of simulations

session = Session(aws_access_key_id='AWS_ACCESS_KEY',
s3 = session.resource("s3")
s3.Bucket('your-bucket').download_file('output/time-grid/part-00000', 'time_grid')
s3.Bucket('your-bucket').download_file('output/npv_cube/part-00000', 'npv_cube')
#load the files into numpy arrays for analysis
T = np.loadtxt('time_grid')
data_file = open('npv_cube', 'r')
block = ''

npv_cube = np.zeros((NSim,len(T),2))

#quick way to load, I am sure there are better ways
for line in data_file:
    block += line.replace('[','').replace(']','').lstrip()

npv_cube = np.loadtxt(StringIO(unicode(block)))
#reshape to array with NSim rows, time-grid columns and with each cell having 2 NPVs
npv_cube = npv_cube.reshape(NSim, len(T), 2)

Once we have the time grid and NPV cube in memory, we can do some plots to visualize the simulated exposure paths. The Blue paths are for Collateralized exposures and Red are for Uncollateralized.

#plot 30 simulated exposure paths (out of 5000)
plt.figure(figsize=(7, 5), dpi=300)
for i in range(0, 30):
    plt.plot(T, npv_cube[i,:,0]/1000., 'r') # Uncollateralized
    plt.plot(T, npv_cube[i, :, 1] / 1000., 'b') # Collateralized
plt.xlabel("Time in years")
plt.ylabel("Exposure in Thousands")
plt.title("Simulated Exposure paths")

Now we can calculate and plot the Expected Exposure where we just take the positive exposures from the above simulated exposure paths.

# Calculate the expected exposure
E = npv_cube.copy()
uncoll_exposures = npv_cube[:,:,0]
uncoll_exposures[uncoll_exposures < 0] = 0
uncoll_expected_exposures = np.sum(uncoll_exposures, axis=0) / NSim

coll_exposures = npv_cube[:,:,1]
coll_exposures[coll_exposures < 0] = 0
coll_expected_exposures = np.sum(coll_exposures, axis=0) / NSim

plt.figure(figsize=(7, 5), dpi=300)
plt.plot(T, uncoll_expected_exposures/1000., 'r', label='Uncollateralized')
plt.plot(T, coll_expected_exposures/1000., 'b', label='Collateralized')
plt.xlabel("Time in years")
plt.ylabel("Expected Exposure in Thousands")
plt.title("Expected Exposure")
plt.legend(loc='upper left')

Now we can plot the PFE curves where we take the 95% quantile of above expected exposures.

# Calculate the PFE curve (95% quantile)
uncoll_PFE_curve = np.percentile(uncoll_exposures, 95, axis=0, interpolation='higher')
coll_PFE_curve = np.percentile(coll_exposures, 95, axis=0, interpolation='higher')
plt.figure(figsize=(7,7), dpi=300)
plt.plot(T, uncoll_PFE_curve, 'r', label='Uncollateralized')
plt.plot(T, coll_PFE_curve, 'b', label='Collateralized')
plt.xlabel("Time in years")
plt.title("PFE Curves")
plt.legend(loc='upper left')

We can now calculate the Maximum PFE for both the curves.

# calculate the maximum pfe
MPFE = np.max(uncoll_PFE_curve)
print 'Maximum Uncollateralized PFE:%f' % MPFE

MPFE = np.max(coll_PFE_curve)
print 'Maximum Collateralized PFE:%f' % MPFE

Maximum Uncollateralized PFE: 260,962.61

Maximum Collateralized PFE: 252,916.08

The spark-submit script that I have used is by no means optimized. I have tried tuning the various spark memory and number of executors parameters in a trial and error approach within a limited time frame I had. The configuration I came up with is optimal for the cluster I used  but I am sure it can be improved.

Thanks for stopping by.


Estimating Potential Future Exposure with QuantLib and AWS EMR – Part I

Counterparty risk is the risk that a party to an OTC derivatives contract may fail to perform on its contractual obligations, causing losses to the other party. Credit exposure is the actual loss in the event of a counterparty default.

Some of the ways to reduce counterparty risk :

Netting: Offset positive and negative contract values with the same counterparty reduces exposure to that counterparty

Collateral: Holding cash or securities against an exposure

Central counterparties (CCP): Use a third party clearing house as a counterparty between buyer and seller and post margin

Potential Future Exposure (PFE) is a measure of credit risk and is the worst exposure one could have to a counterparty at a certain time in future with a certain level of confidence. For example, for a PFE of $100,000 with 95% confidence, we expect to have an exposure (loss in case counterparty defaults with no recovery) greater than $100,000 in only 5% of scenarios.

Netting set is a group of OTC trades (could be interest rate swaps, FxFwds or CCS) that are facing the same counterparty. An ISDA CSA agreement with that counterparty defines how an exposure is treated within the netting set but usually we can “net” the exposures of different instruments in the set which reduces the exposure. For example, a positive exposure on a swap could be netted with a negative exposure on FxFwd.

Estimating PFE involves simulating future market risk scenarios, calculating “netted” MtM values of OTC trades that are facing the same counterparty at various dates in future at each scenario and taking only the positive MtMs which represent our exposure to counterparty,  then taking let’s say 95% quantile of the peak exposures.

For an interest rate swap, market risk factor is the underlying forward curve which determines the NPV of the floating leg. For an FxFwd, its the forward interest curves for the two currencies and the forward FX rate.

In this post, to generate future scenarios of the curves, I use Hull-White one factor short rate model which is assumed to be calibrated. There are many excellent resources available online which discuss interest rate models, PFE and QuantLib/Python in general, some of which I have used here are:

Hull White Term Structure Simulations

Expected Exposure and PFE Simulation

Derivatives CVA Calculation

The focus of this post is to provide a proof of concept of estimating PFE on Amazon Web Service’s ElasticMapReduce (EMR). AWS EMR provides a complete platform of Hadoop ecosystem tools to process vast amounts of data in dynamically scalable (hence “elastic”) environment. I will use Apache Spark hosted in EMR cluster to generate future curve simulations and perform NPV calculations on various dates for each of the scenarios. As you can see, for a realistic number of simulations, the number of calculations needed for a netting set comprised of a few swaps will easily exceed the capacity of a single machine.

Challenges and Issues:

But the devil’s always in the details, right? Here are some of the challenges I faced with this whole setup involving Amazon EMR, QuantLib and Python and ways to overcome them:

While developing my Spark PFE “job” to be submitted to EMR cluster, I needed to spin a cluster on demand. Obviously, I needed to terminate it as soon as I was done or else I will continue to be charged for it. This means that, every time I spin a new cluster, I will get fresh EC2 instances with no QuantLib or Anaconda Python installed on them. Now, installing Anaconda Python and compiling Boost and then QuantLib and then QuantLib SWIG wrappers and then making all of it work seamlessly is not for the faint of heart 🙂 But there is a way to solve the EC2 problem. AWS has something called AMI (Amazon Machine Image) which is really a base machine you can build your cluster with. So I spin up an EC2 instance, install/compile all the software I need and then save it as an AMI and use it as base machine for the cluster so that QuantLib/Python is already setup on all nodes from the get go.

However, AWS EMR requires that if you are going to use custom AMI for cluster nodes, it must be based on Amazon’s own Linux AMI. Amazon Linux AMI is loosely based on RHEL/CentOS but it’s not the same. I faced several issues in compiling Boost as well as QuantLib on this version of Linux and all of them were related to the gcc/g++ version that came with it which was really old (4.3) So I upgraded the gcc version to 6.4 and then installed Boost, QuantLib on top of it.

In a very simplified view, Spark is a distributed architecture with a master node and number of worker nodes. All the input, output and intermediate data is stored in-memory as resilient distributed datasets (RDDs). The objects are serialized and are distributed to the worker nodes in order to be processed. However, I quickly realized that QuantLib/Python SWIG objects cannot be serialized (or pickled), so we must re-create QuantLib objects like Swaps, Dates and Curves from pure Python objects on the worker nodes each time we need them.

I also wanted to incorporate collateral movements into my exposure calculations since all bilateral OTC swaps/FxFwds are required to be collateralized and wanted to see how collateral affects PFE. As it turns out, it reduces PFE but doesn’t completely eliminate it due to the margin period of risk (7 days in this post) and asymmetric collateral amounts delivered and posted due to Threshold and MTA requirements. I have referenced Jon Gregory’s Counterparty Credit Risk Second Edition book and spreadsheets for this purpose.

I have also added FxFwd (EUR/USD in this case) to the netting set as we deal with large amounts of FxFwds on a day-to-day basis. But the FxFwd example in this post is not very realistic as there is only one FxFwd that matures in 1 year and will not be replaced. Usually, FxFwds are continually rolled over (also known as TARF) in a portfolio but this one FxFwd really twists the netting set NPV while it has not matured. The way I calculated FxFwd exposure at time t is as follows:

forward points = spot rate(t) – forward rate

YF(t) = year fraction from t until maturity of FxFwd, r_dom(t) = domestic currency zero rate

FxFwd NPV(t) = ((spot rate + fwd points) * FxFwd notional)/( 1 + (r_dom(t) * YF(t)))

FxFwd Exposure(t) = (FxFwd notional * spot rate(t)) – FxFwd NPV(t)

The forward rate is the agreed upon rate and spot rate at time t was obtained from FX rate simulation using Garman-Kohlagen process.

The r_dom(t) rate was obtained from the same yield curve simulation  used for swap MtM calculations.

The main stages of our Spark job are as follows:

The Spark driver program loads the netting set instruments, USD swap curve , EUR swap curve, previous swap fixings (as the swaps are “seasoned” and have been traded a year ago) from Amazon S3 bucket.

We determine the dates we want to estimate the netting set NPV based on a weekly collateral schedule plus any swap reset dates.

We calculate today’s netting set NPV which will be the base from which the simulated NPVs will start from.

We will generate a matrix of normally distributed random numbers using Spark Machine Learning library called MLLib which will return a RDD with random numbers distributed across the cluster in the form of partitions.

The work to compute NPVs (Collateralized and Uncollateralized) will be distributed by Spark to available nodes (containers with executor threads) and will be done in parallel.

At the driver, we wait for all the work to be done and then collect the results of all NPV simulations called “NPV Cube” and write them out to a file on Amazon S3 bucket and terminate the cluster.

Another python script which runs on a local machine (not in cluster) then reads from this S3 bucket using boto3 library and visualizes/computes the final quantile calculation.

Let’s dive into the code. All the code and input files used in this post are available at Spark-PFE github repo

Here are the various input files used in this post. Obviously, the swap curves are not real but they are close to real. Libor fixings are real since the data is public domain.

usd-libor-swap-curve.csv , eur-libor-swap-curve.csv , instruments.csv, USD3MLibor-Fixings.csv

from pyspark import SparkConf, SparkContext
from pyspark.mllib.random import RandomRDDs
import QuantLib as ql
import datetime as dt
import numpy as np
import math
import sys

# Used in loading the various input text files
def is_number(s):
        return True
    except ValueError:
        return False

# QuantLib date to Python date
def ql_to_datetime(d):
    return dt.datetime(d.year(), d.month(), d.dayOfMonth())

# Python date to QuantLib date
def py_to_qldate(d):
    return ql.Date(, d.month, d.year)

# Loads libor fixings from input file into an RDD and then collects the results
def loadLiborFixings(libor_fixings_file):

    libor_fixings = sc.textFile(libor_fixings_file) \
        .map(lambda line: line.split(",")) \
        .filter(lambda r: is_number(r[1])) \
        .map(lambda line: (str(line[0]), float(line[1]))).cache()

    fixingDates = r: r[0]).collect()
    fixings = r: r[1]).collect()
    return fixingDates, fixings

# Loads input swap specifications from input file into an RDD and then collects the results
def load_swaps(instruments_file):
    swaps = sc.textFile(instruments_file) \
            .map(lambda line: line.split(",")) \
            .filter(lambda r: r[0] == 'IRS') \
            .map(lambda line: (str(line[1]), str(line[2]), float(line[3]), float(line[4]), str(line[5])))\
    return swaps

# Loads input FxFwd specifications from input file into an RDD and then collects the results
def load_fxfwds(instruments_file):
    fxfwds = sc.textFile(instruments_file) \
        .map(lambda line: line.split(",")) \
        .filter(lambda r: r[0] == 'FXFWD') \
        .map(lambda line: (str(line[1]), str(line[2]), float(line[3]), float(line[4]), str(line[5]), str(line[6])))\
    return fxfwds

# Builds a QuantLib swap object from given specification
def makeSwap(today, start, maturity, nominal, fixedRate, index, typ=ql.VanillaSwap.Payer):
    calendar = ql.UnitedStates()
    fixedLegTenor = ql.Period(6, ql.Months)
    floatingLegBDC = ql.ModifiedFollowing
    fixedLegDC = ql.Thirty360(ql.Thirty360.BondBasis)
    spread = 0.0
    settle_date = calendar.advance(start, 2, ql.Days)
    end = calendar.advance(settle_date, maturity, floatingLegBDC)

    fixedSchedule = ql.Schedule(settle_date,
                                ql.ModifiedFollowing, ql.ModifiedFollowing,
                                ql.DateGeneration.Forward, False)
    floatSchedule = ql.Schedule(settle_date,
    swap = ql.VanillaSwap(typ,
    return swap, [index.fixingDate(x) for x in floatSchedule if index.fixingDate(x) >= today][:-1]

Below is the main method invoked by the spark-submit driver program.

# main method invoked by Spark driver program
def main(sc, args_dict):
    # Broadcast dictionary obejct, which will hold various pure python objects
    # needed by the executors
    # QuantLib SWIG wrappers cannot be pickled and sent to the executotrs
    # so we use broadcast variables in Spark to pass pure python obejcts
    broadcast_dict = {}
    pytoday = dt.datetime(2017, 9, 8)
    broadcast_dict['python_today'] = pytoday
    today = ql.Date(, pytoday.month,pytoday.year)
    ql.Settings.instance().evaluationDate = today

    usd_calendar = ql.UnitedStates()
    usd_dc = ql.Actual365Fixed()
    eurusd_fx_spot = ql.SimpleQuote(1.1856)
    broadcast_dict['eurusd_fx_spot'] = eurusd_fx_spot.value()
    output_dir = args_dict['output_dir']
    instruments_file = args_dict['instruments_file']
    libor_fixings_file = args_dict['libor_fixings_file']

    # Loads USD libor swap curve from input file
    usd_swap_curve = sc.textFile(args_dict['usd_swap_curve_file']) \
        .map(lambda line: line.split(",")) \
        .filter(lambda r: is_number(r[1])) \
        .map(lambda line: (str(line[0]), float(line[1]))) \
    usd_curve_dates = r: r[0]).collect()
    usd_disc_factors = r: r[1]).collect()
    broadcast_dict['usd_curve_dates'] = usd_curve_dates
    broadcast_dict['usd_disc_factors'] = usd_disc_factors
    usd_crv_today = ql.DiscountCurve([ql.DateParser.parseFormatted(x,'%Y-%m-%d')
                                      for x in usd_curve_dates] , usd_disc_factors, usd_dc, usd_calendar )
    usd_disc_term_structure = ql.RelinkableYieldTermStructureHandle(usd_crv_today)

    # Loads EUR libor swap curve from input file
    eur_swap_curve = sc.textFile(args_dict['eur_swap_curve_file']) \
        .map(lambda line: line.split(",")) \
        .filter(lambda r: is_number(r[1])) \
        .map(lambda line: (str(line[0]), float(line[1]))) \
    eur_curve_dates = r: r[0]).collect()
    eur_disc_factors = r: r[1]).collect()
    broadcast_dict['eur_curve_dates'] = eur_curve_dates
    broadcast_dict['eur_disc_factors'] = eur_disc_factors

    # Build QuantLib index object
    usdlibor3m = ql.USDLibor(ql.Period(3, ql.Months), usd_disc_term_structure)
    # don't need EUR fixings since we don't have a EUR swap
    fixingDates, fixings = loadLiborFixings(libor_fixings_file)
    fixingDates = [ql.DateParser.parseFormatted(r, '%Y-%m-%d') for r in fixingDates]
    three_month_old_date = usd_calendar.advance(today, -90, ql.Days, ql.ModifiedFollowing)
    latestFixingDates = fixingDates[fixingDates.index(three_month_old_date):]
    latestFixings = fixings[fixingDates.index(three_month_old_date):]
    usdlibor3m.addFixings(latestFixingDates, latestFixings)
    broadcast_dict['fixing_dates'] = [ql_to_datetime(x) for x in latestFixingDates]
    broadcast_dict['fixings'] = latestFixings

    swaps = load_swaps(instruments_file)
    broadcast_dict['swaps'] = swaps
    fxfwds = load_fxfwds(instruments_file)
    broadcast_dict['fxfwds'] = fxfwds

    swaps = [
                makeSwap(today, ql.DateParser.parseFormatted(swap[0], '%Y-%m-%d'),
                     ql.Period(swap[1]), swap[2], swap[3], usdlibor3m,
                     ql.VanillaSwap.Payer if swap[4] == 'Payer' else ql.VanillaSwap.Receiver)
                for swap in swaps

    longest_swap_maturity = max([s[0].maturityDate() for s in swaps])
    broadcast_dict['longest_swap_maturity'] = ql_to_datetime(longest_swap_maturity)

    Nsim = int(args_dict['NSim'])
    NPartitions = int(args_dict['NPartitions'])
    a = float(args_dict['a']) #0.376739
    sigma = float(args_dict['sigma']) #0.0209835
    broadcast_dict['a'] = a
    broadcast_dict['sigma'] = sigma

    # Simulate swap NPVs until we reach the longest maturity
    years_to_sim = math.ceil(ql.Actual360().yearFraction(today,longest_swap_maturity))

    dates = [today + ql.Period(i, ql.Weeks) for i in range(0, 52 * int(years_to_sim))]
    # Very important to add swap reset dates to our universe of dates
    for idx in xrange(len(swaps)):
        dates += swaps[idx][1]
    dates = np.unique(np.sort(dates))
    broadcast_dict['dates'] = [ql_to_datetime(x) for x in dates]

    br_dict = sc.broadcast(broadcast_dict)

    # Write out the time grid to a text file which can be parsed later
    T = [ql.Actual360().yearFraction(today, dates[i]) for i in xrange(1, len(dates))]
    temp_rdd = sc.parallelize(T)
    # coalesce with shrink the partition size to 1 so we have only 1 file to write and parse later
    temp_rdd.coalesce(1).saveAsTextFile(output_dir +'/time-grid')

    # the main routine, generate a matrix of normally distributed random numbers and
    # spread them across the cluster
    # we are not setting the number of partitions here as the default parallelism should be enough
    randArrayRDD = RandomRDDs.normalVectorRDD(sc, Nsim, len(T), seed=1L)

    # for each row of the matrix, which corresponds to one simulated path,
    # compute netting set NPV (collateralized and uncollateralized)
    npv_cube = p: (calc_exposure(p,T,br_dict)))
    # write out the npv cube, remove '[' and ']' added by numpy array to ease parsing later
    npv_cube.coalesce(1).saveAsTextFile(output_dir + '/npv_cube')

Below is the method that will be executed on the executors in parallel on the EMR cluster.

def calc_exposure(rnumbers, time_grid, br_dict):

    # get hold of broadcasted dictionary and rebuild the curves, libor index, swaps and FxFwd
    # we need to do this since QuantLib SWIG objects cannot be passed around

    broadcast_dict = br_dict.value
    today = py_to_qldate(broadcast_dict['python_today'])
    usd_calendar = ql.UnitedStates()
    usd_dc = ql.Actual365Fixed()
    eur_calendar = ql.TARGET()
    eur_dc = ql.ActualActual()

    maturity = 10
    a = broadcast_dict['a']
    sigma = broadcast_dict['sigma']
    dates = [py_to_qldate(x) for x in broadcast_dict['dates']]
    longest_swap_maturity = py_to_qldate(broadcast_dict['longest_swap_maturity'])
    fixing_dates = [py_to_qldate(x) for x in broadcast_dict['fixing_dates']]
    fixings = broadcast_dict['fixings']

    usd_curve_dates = broadcast_dict['usd_curve_dates']
    usd_disc_factors = broadcast_dict['usd_disc_factors']
    usd_t0_curve = ql.DiscountCurve([ql.DateParser.parseFormatted(x, '%Y-%m-%d')
                        for x in usd_curve_dates], usd_disc_factors, usd_dc, usd_calendar)
    usd_disc_term_structure = ql.RelinkableYieldTermStructureHandle(usd_t0_curve)

    eur_curve_dates = broadcast_dict['eur_curve_dates']
    eur_disc_factors = broadcast_dict['eur_disc_factors']
    eur_t0_curve = ql.DiscountCurve([ql.DateParser.parseFormatted(x, '%Y-%m-%d')
                                     for x in eur_curve_dates], eur_disc_factors, eur_dc, eur_calendar)
    eur_disc_term_structure = ql.RelinkableYieldTermStructureHandle(eur_t0_curve)

    eurusd_fx_spot = broadcast_dict['eurusd_fx_spot']
    flat_vol_hyts = ql.BlackVolTermStructureHandle(ql.BlackConstantVol(today, usd_calendar, 0.20, usd_dc))
    usdlibor3m = ql.USDLibor(ql.Period(3, ql.Months), usd_disc_term_structure)
    usdlibor3m.addFixings(fixing_dates, fixings)
    engine = ql.DiscountingSwapEngine(usd_disc_term_structure)
    swaps = broadcast_dict['swaps']
    swaps = [
        makeSwap(today, ql.DateParser.parseFormatted(swap[0], '%Y-%m-%d'),
                 ql.Period(swap[1]), swap[2], swap[3], usdlibor3m,
                 ql.VanillaSwap.Payer if swap[4] == 'Payer' else ql.VanillaSwap.Receiver)
        for swap in swaps

    # I know there is only one fxfwd in the instruments file
    fxfwds = broadcast_dict['fxfwds']
    fx_startdate = ql.DateParser.parseFormatted(fxfwds[0][0], '%Y-%m-%d')
    fx_tenor = ql.Period(fxfwds[0][1])
    fx_notional = fxfwds[0][2]
    fwd_rate = fxfwds[0][3]
    fx_maturity = usd_calendar.advance(fx_startdate, fx_tenor, ql.Following)

    # define the NPV cube array
    # number of rows = number of dates in time-grid
    # number of columns = 2 (collateralized and uncollateralized NPV)
    npvMat = np.zeros((len(time_grid),2))

    # utility method to calc FxFwd exposure
    def fxfwd_exposure(date, spot, usd_curve):
        usd_zero_rate = usd_curve.zeroRate(usd_dc.yearFraction(date, fx_maturity),
                                           ql.Compounded, ql.Annual).rate()
        yf = usd_dc.yearFraction(date, fx_maturity)
        fwd_points = fwd_rate - spot
        fxfwd_npv = ((spot + fwd_points) * fx_notional) / (1 + (usd_zero_rate * yf))
        fxfwd_exp = (fx_notional * eurusd_fx_spot) - fxfwd_npv
        return fxfwd_exp

    # Intialize NPV cube with today's NPVs
    nettingset_npv = 0.
    for idx in xrange(len(swaps)):
        nettingset_npv += swaps[idx][0].NPV()
    fxfwd_exp = fxfwd_exposure(today, eurusd_fx_spot, usd_t0_curve)
    nettingset_npv += fxfwd_exp
    npvMat[0] = nettingset_npv

    # Hull White parameter estimations
    def gamma(t):
        forwardRate = usd_t0_curve.forwardRate(t, t, ql.Continuous, ql.NoFrequency).rate()
        temp = sigma * (1.0 - np.exp(- a * t)) / a
        return forwardRate + 0.5 * temp * temp

    def B(t, T):
        return (1.0 - np.exp(- a * (T - t))) / a

    def A(t, T):
        forward = usd_t0_curve.forwardRate(t, t, ql.Continuous, ql.NoFrequency).rate()
        value = B(t, T) * forward - 0.25 * sigma * B(t, T) * sigma * B(t, T) * B(0.0, 2.0 * t)
        return np.exp(value) * /

    usd_rmat = np.zeros(shape=(len(time_grid)))
    usd_rmat[:] = usd_t0_curve.forwardRate(0,0, ql.Continuous, ql.NoFrequency).rate()

    eur_rmat = np.zeros(shape=(len(time_grid)))
    eur_rmat[:] = eur_t0_curve.forwardRate(0, 0, ql.Continuous, ql.NoFrequency).rate()

    spotmat = np.zeros(shape=(len(time_grid)))
    spotmat[:] = eurusd_fx_spot

    # CSA terms for Collateral movements
    IA1 = 0; IA2 = 0; threshold1 = 50000.0; threshold2 = 50000.0; MTA1 = 25000.0; MTA2 = 25000;rounding = 5000.0
    collateral_held = IA2 - IA1
    collateral_posted = 0.0

    # the main loop of NPV computations
    for iT in xrange(1, len(time_grid)):
        mean = usd_rmat[iT - 1] * np.exp(- a * (time_grid[iT] - time_grid[iT - 1])) + \
               gamma(time_grid[iT]) - gamma(time_grid[iT - 1]) * \
                                        np.exp(- a * (time_grid[iT] - time_grid[iT - 1]))

        var = 0.5 * sigma * sigma / a * (1 - np.exp(-2 * a * (time_grid[iT] - time_grid[iT - 1])))
        rnew = mean + rnumbers[iT-1] * np.sqrt(var)
        usd_rmat[iT] = rnew
        # USD discount factors as generated by HW model
        usd_disc_factors = [1.0] + [A(time_grid[iT], time_grid[iT] + k) *
                                np.exp(- B(time_grid[iT], time_grid[iT] + k) * rnew) for k in xrange(1, maturity + 1)]

        mean = eur_rmat[iT - 1] * np.exp(- a * (time_grid[iT] - time_grid[iT - 1])) + \
               gamma(time_grid[iT]) - gamma(time_grid[iT - 1]) * \
                                      np.exp(- a * (time_grid[iT] - time_grid[iT - 1]))

        var = 0.5 * sigma * sigma / a * (1 - np.exp(-2 * a * (time_grid[iT] - time_grid[iT - 1])))
        rnew = mean + rnumbers[iT - 1] * np.sqrt(var)
        eur_rmat[iT] = rnew
        # EUR discount factors as generated by HW model
        eur_disc_factors = [1.0] + [A(time_grid[iT], time_grid[iT] + k) *
                                    np.exp(- B(time_grid[iT], time_grid[iT] + k) * rnew) for k in xrange(1, maturity + 1)]

        if dates[iT] > longest_swap_maturity:

        # very important to reset the valuation date
        crv_date = dates[iT]
        crv_dates = [crv_date] + [crv_date + ql.Period(k, ql.Years) for k in xrange(1, maturity + 1)]
        # use the new disc factors to build a new simulated curve
        usd_crv = ql.DiscountCurve(crv_dates, usd_disc_factors, ql.Actual365Fixed(), ql.UnitedStates())
        eur_crv = ql.DiscountCurve(crv_dates, eur_disc_factors, ql.ActualActual(), ql.TARGET())
        usdlibor3m.addFixings(fixing_dates, fixings)
        swap_engine = ql.DiscountingSwapEngine(usd_disc_term_structure)

        # build Garman-Kohlagen process for FX rate simulation for FxFwd
        gk_process = ql.GarmanKohlagenProcess(ql.QuoteHandle(ql.SimpleQuote(eurusd_fx_spot)),
                                              usd_disc_term_structure, eur_disc_term_structure, flat_vol_hyts)
        dt = time_grid[iT] - time_grid[iT - 1]
        spotmat[iT] = gk_process.evolve(time_grid[iT - 1], spotmat[iT - 1], dt, rnumbers[iT - 1])
        nettingset_npv = 0.
        for s in range(len(swaps)):
            if usdlibor3m.isValidFixingDate(dates[iT]):
                fixing = usdlibor3m.fixing(dates[iT])
                usdlibor3m.addFixing(dates[iT], fixing)
            nettingset_npv += swaps[s][0].NPV()

        if dates[iT] <= fx_maturity:
            fxfwd_exp = fxfwd_exposure(dates[iT], spotmat[iT], usd_crv)
            nettingset_npv += fxfwd_exp

        # we have uncollateralized netting set NPV
        npvMat[iT,0] = nettingset_npv
        prev_exposure = npvMat[iT-1,1]
        print 'prev_exposure:%f' % prev_exposure
        collateral_held = collateral_held + collateral_posted
        print 'collateral_held:%f' % collateral_held
        nettingset_npv = nettingset_npv + IA2 - IA1
        print 'nettingset_npv:%f' % nettingset_npv
        collateral_required = max(nettingset_npv - threshold2, 0) \
                              - max(-nettingset_npv - threshold1, 0) - collateral_held
        print 'collateral_required:%f' % collateral_required
        collateral_posted = collateral_required
        if collateral_posted > 0:
            if collateral_posted < MTA2:
                collateral_posted = 0.
        elif -collateral_posted < MTA1:
            collateral_posted = 0.
        print 'collateral_posted:%f' % collateral_posted
        if collateral_posted <> 0. and rounding <> 0.:
            collateral_posted = math.ceil(collateral_posted/rounding) * rounding
        print 'collateral_posted after rounding:%f' % collateral_posted
        if collateral_posted < 0:
            collateral_held = collateral_held + collateral_posted
            collateral_posted = 0.
        print 'collateral_held:%f' % collateral_held
        # we have collateralized netting set NPV
        npvMat[iT,1] = nettingset_npv - collateral_held
        print 'npvMat:%f' % npvMat[iT,1]
        print '========================================='
    return npvMat

Below is the driver program initialization and argument parsing.

if __name__ == "__main__":

    if len(sys.argv) != 10:
        print('Usage: ' + sys.argv[0] + ' <num_of_simulations> <num_of_partitions> <hull_white_a> <hull_white_sigma> <usd_swap_curve><eur_swap_curve><libor_fixings><instruments><output_dir>')
    conf = SparkConf().setAppName("SPARK-PFE")
    sc  = SparkContext(conf=conf)

    args = {'NSim':sys.argv[1], 'NPartitions': sys.argv[2], 'a': sys.argv[3], 'sigma':sys.argv[4],
            'usd_swap_curve_file': sys.argv[5], 'eur_swap_curve_file': sys.argv[6], 'libor_fixings_file': sys.argv[7],
            'instruments_file': sys.argv[8], 'output_dir': sys.argv[9]}

    main(sc, args)

The spark-submit script used to submit this job looks as follows:

PYSPARK_DRIVER_PYTHON=/opt/anaconda/anaconda2/bin/python \
PYSPARK_PYTHON=/opt/anaconda/anaconda2/bin/python \
spark-submit \
--deploy-mode cluster \
--master yarn \
--conf spark.driver.extraLibraryPath="${LD_LIBRARY_PATH}" \
--conf spark.executorEnv.LD_LIBRARY_PATH="${LD_LIBRARY_PATH}"  \
--num-executors 14 \
--conf spark.yarn.executor.memoryOverhead=5120 \
--executor-memory 43G \
--conf spark.yarn.driver.memoryOverhead=5120 \
--driver-memory 43G \
--executor-cores 12 \
--driver-cores 3 \
--conf spark.default.parallelism=168 \ 5000 48 0.376739 0.0209835 \
s3://spark-pfe/usd-libor-swap-curve.csv \
s3://spark-pfe/eur-libor-swap-curve.csv \
s3://spark-pfe/USD3MLibor-Fixings.csv \
s3://spark-pfe/instruments.csv \

After the spark job completes, the output files which are time-grid array and NPV cube are stored in an S3 bucket. We will use another Python script to pull the files and visualize the PFE which will be the subject of Part II of this post.


Swap Curve construction using Global Optimization

In my previous post, I discussed option Implied Volatility and Binomial Model. In this post, we switch gears and discuss Swap Curve construction. Building a swap curve is so fundamental to interest rate derivatives pricing that it is probably one of most closely guarded proprietary information on the trading desk.

Pricing an interest rate swap involves bootstrapping a blended curve of different instruments based on their maturities and market liquidity. Usually cash deposits, Eurodollar futures are used at the short end and market swap rates are used at the long end. At the long end, however, we have only a subset of market swap rates available and bootstrapping requires all the missing rates to be interpolated from known rates. This makes interpolation methodology a critical part of curve building. Also, since forward rates are a gradient of the discount rates, any misalignment in the latter is magnified in the former.

There is an alternative approach to bootstrapping called global optimization approach, where the short end of the curve is bootstrapped as usual but at the longer end we “guess” the forward rates, compute the par rates of the market swaps and minimize the error between the actual par rates and the computed par rates. We also add a smoothness constraint to the minimization procedure so that overall gradient of the curve is minimized. This approach is illustrated in the excellent book Swaps and Other derivatives 2nd Ed by Richard Flavell

I will use QuantLib to generate swap schedules and to deal with business day conventions. QuantLib can of course generate a fully built swap curve but I will use Scipy’s optimize package for curve building. My objective was to match Spreadsheet 3.9 “Building a blended curve” from the above book. Unfortunately, the spreadsheet does not actually show the equations for Excel’s Solver used for optimization but shows the final result, so that leaves considerable ambiguity in understanding which I hope I will be able to clear.

Note on QuantLib and Python

There are numerous resources online on how to build QuantLib from sources and then build the Python extensions, I would like to point you to the precompiled package for QuantLib-Python maintained by Christoph Gohlke. If you are on windows, you can just install the whl package and get started.

First some common formulae we will be using:

$$Discount Factor : DF_t = \frac{1}{(1 + r_t * d_t)}$$ where $d_t$ is year fraction and $r_t$ is annual rate

$$Forward Rate : F_{t/T} = \frac{[(DF_t/DF_T)- 1]}{(T-t)}$$ where $DF_t$ is disocunt factor to t and $DF_T$ is disocunt factor to T (both from today)

$$Par  Rate  of  Swap: \frac{(1-D_T)}{\sum_{n=1}^n(\Delta_n * D_n)}$$ where $D_T$ is maturity date discount factor, $\Delta_n$ are the time fractions between 2 reset dates and $D_n$ are the various reset date discount factors.

import QuantLib as ql
import numpy as np
from scipy.optimize import least_squares
from scipy.interpolate import interp1d
import math
import timeit
import matplotlib.pyplot as plt

# we are trying to match Flavell's spreadsheet 3.9
today = ql.Date(4, ql.February,2008)
calendar = ql.JointCalendar(ql.UnitedStates(), ql.UnitedKingdom())
ql.Settings.instance().evaluationDate = today
settle_date = calendar.advance(today, 2, ql.Days)
#define a Swap class to keep state of various dates and year fractions

class MySwap(object):
    def __init__(self, floating_dates, fixed_dates, fixed_rate):
        super(MySwap, self).__init__()
        self.fixed_rate = fixed_rate
        self.floating_dates = floating_dates
        self.fixed_dates = fixed_dates
        self.floating_leg_from_dates = floating_dates[:-1]
        self.floating_leg_to_dates = floating_dates[1:]
        self.fixed_leg_from_dates = fixed_dates[:-1]
        self.fixed_leg_to_dates = fixed_dates[1:]
        self.yf_floating_leg = [ql.Thirty360(ql.Thirty360.BondBasis).yearFraction(d1,d2) for d1,d2
                                in zip(self.floating_leg_from_dates,self.floating_leg_to_dates)]
        self.yf_fixed_leg = [ql.Thirty360(ql.Thirty360.BondBasis).yearFraction(d1, d2) for d1, d2
                                in zip(self.fixed_leg_from_dates, self.fixed_leg_to_dates)]
    # for sorting 2 swaps by maturity date   
    def __gt__(self, swap2):
        return self.floating_leg_to_dates[-1] > swap2.floating_leg_to_dates[-1]
#class for Eurodollar futures

class MyFuture(object):
    def __init__(self, begin_date, mkt_yield):
        super(MyFuture, self).__init__()
        self.begin_date = begin_date
        self.mkt_yield=  mkt_yield
#class for Cash deposits

class MyDeposit(object):
    def __init__(self, begin_date, end_date, mkt_yield):
        super(MyDeposit, self).__init__()
        self.begin_date = begin_date
        self.end_date = end_date
        self.mkt_yield=  mkt_yield
        self.yf = ql.Thirty360(ql.Thirty360.BondBasis).yearFraction(begin_date, end_date)
        self.disc_factor = 1/(1+(self.yf*mkt_yield))
# A utility function to recursively calculate a series of discount factors from year fractions
# and forward rates

def get_disc_factors(yf, rates):
    df = [0] * len(yf)
    df[0] = 1.0 / (1.0 + (yf[0] * rates[0]))
    for i in range(1, len(yf)):
        df[i] = 1.0 / (1.0 + (yf[i] * rates[i])) * df[i - 1]
    return df
# this is the objective function which will be minimized using least squares method
# we calculate the par swap rate from the "guessed" forward rates 
# and minimize the difference between this par swap rate and the market quoted par swap rate
# the first 4 forward rates have been derived from deposit and Eurodollar futures and
# are known

def scipy_func_to_minimize(fwdGuesses):

    x = np.concatenate([known_fwds,fwdGuesses])
    df_float_leg_longest_swap = get_disc_factors(swaps[-1].yf_floating_leg, x)
    def par_rate(yf,df):
        up = 1 - df[-1]
        down = np.sum([y*d for y,d in zip(yf,df)])
        return up/down

    def swap_rate(swap):

        df = np.interp([d.serialNumber() for d in swap.fixed_leg_to_dates],
                       [dl.serialNumber() for dl in swaps[-1].floating_leg_to_dates],
        return par_rate(swap.yf_fixed_leg, df)

    fi = [swap_rate(s) - s.fixed_rate for s in swaps]
    sq = smoothness_func(x)
    return fi
# This function ensures a smooth forward curve by minimizing the gradient of the 
# forward rates
# This function is added as an additional constraint to the above 
# objective function

def smoothness_func(fwdGuesses):
    sq = 0.0
    for i, f in enumerate(fwdGuesses[:-1]):
        sq += math.pow((fwdGuesses[i] - fwdGuesses[i + 1]), 2)
    return sq
# A utility function to define a vanilla IRS, specifically USD-3M-Libor 
# fixed-float swap

def makeVanilla1Y3MSwap(maturity, fixedRate, index):

    end = calendar.advance(settle_date, maturity)
    fixedLegTenor = ql.Period("1y")
    fixedLegBDC = ql.ModifiedFollowing
    fixedLegDC = ql.Thirty360(ql.Thirty360.BondBasis)
    spread = 0.0
    fixedSchedule = ql.Schedule(settle_date,
    floatSchedule = ql.Schedule(settle_date,
    swap = ql.VanillaSwap(ql.VanillaSwap.Payer,
    return swap
# Market data as of 2/4/2008

libor3m = ql.USDLibor(ql.Period(3, ql.Months))
deposits = [(calendar.advance(settle_date, ql.Period(1, ql.Weeks)),0.032175),
            (calendar.advance(settle_date, ql.Period(1, ql.Months)),0.031813),
            (calendar.advance(settle_date, ql.Period(3, ql.Months)),0.03145)]

depos = [MyDeposit(settle_date, t[0], t[1]) for t in deposits]

# ED Futures start dates : Mar-08,Jun-08,Sep-08,Dec-08,Mar-09
fQuotes =   [97.000,97.410,97.520,97.495,97.395]
futures = []

date_holder = settle_date
for fz in fQuotes:
    fut_begin_date = ql.IMM_nextDate(date_holder)
    futures.append(MyFuture(fut_begin_date, (100.-fz)/100.))
    date_holder = fut_begin_date

ql_swaps = [makeVanilla1Y3MSwap(ql.Period(2, ql.Years),2.795e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(3, ql.Years),3.035e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(4, ql.Years),3.275e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(5, ql.Years),3.5050e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(6, ql.Years),3.715e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(7, ql.Years),3.885e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(8, ql.Years),4.025e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(9, ql.Years),4.155e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(10, ql.Years),4.265e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(12, ql.Years),4.435e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(15, ql.Years),4.615e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(20, ql.Years),4.755e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(25, ql.Years),4.805e-2,libor3m),
            makeVanilla1Y3MSwap(ql.Period(30, ql.Years),4.815e-2,libor3m)]

swaps = [MySwap([d for d in x.floatingSchedule()], [d for d in x.fixedSchedule()], x.fixedRate()) for x in ql_swaps]
#Starting to build fwd curve, add deposits first

fwd_curve = [(dep[0], dep[1]) for dep in deposits]

# get next 3 forward rates from futures rates by interpolating on the 
# provided rates 

fut_curve_dates = [calendar.advance(settle_date, ql.Period(3*i, ql.Months)) for i in range(1, 5)]
fut_interpolation = interp1d([f.begin_date.serialNumber() for f in futures], [f.mkt_yield for f in futures])
fwd_curve.extend([(fut_curve_dates[idx], fut_interpolation(fut_curve_dates[idx-1].serialNumber())[()]) 
                  for idx, f in enumerate(fut_curve_dates) if idx > 0])
# At this point we know first 4 forward rates 
# and we guess the next 116 forwards to be same as the 4th forward rate
known_fwds = []
for d in fut_curve_dates:
    known_fwds.extend([f[1] for f in fwd_curve if f[0] == d])
fwd_guesses = [known_fwds[-1]] * (len(swaps[-1].floating_leg_to_dates)-len(known_fwds))
# Now we proceed with optimization and get the optimized forwards 
# that provide the arbitrage-free and smooth forward curve

start_time = timeit.default_timer()
x = least_squares(scipy_func_to_minimize, fwd_guesses, verbose=1).x
print('### Scipy optimization took:%s seconds' % str(timeit.default_timer() - start_time))
optimized_fwds = np.concatenate([known_fwds, x])

`gtol` termination condition is satisfied.
Function evaluations: 20, initial cost: 1.7812e-03, final cost 1.5585e-09, first-order optimality 1.04e-08.
### Scipy optimization took:2.5262434 seconds

#Start building a discount curve as a list of tuples (date,disc_factor), 
# first discount factor = 1.0

dates_df = [(settle_date, 1.0)]
for idx, fwd in enumerate(optimized_fwds):
    temp_tuple = (swaps[-1].floating_leg_to_dates[idx], 
                  dates_df[idx][1]/(1 + (optimized_fwds[idx] * swaps[-1].yf_floating_leg[idx])))
#Start building a zero curve

zc_rates = []
cumulative_floating_yf = np.cumsum(swaps[-1].yf_floating_leg)
cumulative_floating_yf = np.insert(cumulative_floating_yf, 0, 0.)
entire_date_range = swaps[-1].floating_leg_from_dates + [swaps[-1].floating_leg_to_dates[-1]]
zc_rates = [ (dt, -np.log(dc[1])/yf) for idx, (dc, yf, dt) in enumerate(zip(dates_df,cumulative_floating_yf, entire_date_range)) if idx > 0]
zc_temp_tup = (swaps[-1].floating_leg_from_dates[0], zc_rates[0][1])
zc_rates.insert(0, zc_temp_tup)
#We already have forward rates, just put them in a list of tuples

fwd_rates = [(d,fwd) for d, fwd in zip(swaps[-1].floating_leg_from_dates, optimized_fwds)]
# Plot all 3 curves

_ = plt.figure(figsize=(10, 8.3))

_ = plt.subplot(221)
x_axis = cumulative_floating_yf[1:]
y_axis = list(f[1] for f in fwd_rates)
_ = plt.plot(x_axis, y_axis)
_ = plt.xlabel('Time')
_ = plt.ylabel('Forward Rates')
_ = plt.title('Forward Curve')

_ = plt.subplot(222)
x_axis = cumulative_floating_yf
y_axis = list(d[1] for d in dates_df)
_ = plt.plot(x_axis, y_axis)
_ = plt.xlabel('Time')
_ = plt.ylabel('Discount Factors')
_ = plt.title('Discount Curve')

_ = plt.subplot(223)
x_axis = cumulative_floating_yf
y_axis = list(z[1] for z in zc_rates)
_ = plt.plot(x_axis, y_axis)
_ = plt.xlabel('Time')
_ = plt.ylabel('Zero Rates')
_ = plt.title('Zero Curve')

Option Volatility and Binomial Model

In my previous post Options and Volatility Smile , we used Black-Scholes formula to derive Implied Volatility from given option strike and tenor. Most of the options traded on exchanges are American (with a few index options being European) and can be exercised at any time prior to expiration. Whether it is optimal to exercise an option early depends on whether the stock pays dividend or the level of interest rates and is a very complex subject. What I want to focus on is using Binomial model to price an American option. I summarize below Binomial model theory from the excellent book Derivative Markets 3rd Ed. by Robert McDonald

There are many flavors of the Binomial model but they all have following steps in common:

  • Simulate future prices of underlying stock at various points in time until expiry
  • Calculate the payoff of the option at expiry
  • Discount the payoff back to today to get the option price

1. Simulate future prices of the underlying

Assume the price of a stock is $S_0$ and over a small time period $\Delta t$, the price could either be $S_0u$ or $S_0d$ where u is a factor by which price rises and d is a factor by which price drops. The stock is assumed to follow a random walk, also assume that $p$ is the probability of the stock price rising and $(1-p)$ is the probability of it falling. There are many ways to approach the values of $u$,$d$ and $p$ and the various Binomial models differ in the ways these three parameters are calculated.

In Cox-Ross-Rubinstein (CRR) model, $u = \frac{1}{d}$ is assumed. Since we have 3 unknowns, 2 more equations are needed and those come from risk neutral pricing assumption. Over a small $\Delta t$ the expected return of the stock is

$$pu + (1-p)d = e^{r \Delta t}$$

and the expected variance of the returns is

$$pu^2 + (1-p)d^2 – (e^{r \Delta t})^2 = \sigma ^2 \Delta t$$

Solving for $u$, $d$ and $p$, we get

$$p = \frac{e^{r\Delta t} – d}{u-d}$$

$$u = e^{\sigma \sqrt{\Delta t}}$$

$$d = e^{-\sigma\sqrt{\Delta t}}$$

The CRR model generates a following tree as we simulate multi step stock price movements, this a recombining tree centered around $S_0$.

2. Calculating payoffs at expiry

In this step, we calculate the payoff at each node that corresponds to expiry.

For a put option, $payoff = max(K – S_N, 0)$

For a call option, $payoff = max(S_N – K, 0)$

where $N$ is node at expiry with a stock price $S_N$ and $K$ is the strike.

3. Discounting the payoffs

In this step, we discount the payoffs at expiry back to today using backward induction where we start at expiry node and step backwards through time calculating option value at each node of the tree.

For American put, $V_n = max(K – S_n, e^{-r \Delta t} (p V_u + (1-p) V_d))$

For American call, $V_n = max(S_n – K, e^{-r \Delta t} (p V_u + (1-p) V_d))$

$V_n$ is the option value at node n

$S_n$ is the stock price at node n

$r$ is risk free interest rate

$\Delta t$ is time step

$V_u$ is the option value from the upper node at n+1

$V_d$ is the option value from the lower node at n+1

All the variants of Binomial model, including CRR, converge to Black-Scholes in the limit $\Delta t \to 0$ but the rate of convergence is different. The variant of Binomial model that I would like to use is called Leisen-Reimer Model which converges much faster. Please see the original paper for formulas and a C++ implementation at  which I have ported to Python in the next section.

The python code is going to look very similar to Options and Volatility Smile post except that we will swap out Black-Scholes framework with Leisen-Reimer model. We will also use the same option chain data AAPL_BBG_vols.csv

A note on Python code

I usually do not write code like below, I am purposely avoiding using any classes so that the focus remains on the objective which is to understand the technique.

import math
import numpy as np
import matplotlib as mpl
import pandas as pd
import matplotlib.pyplot as plt
from scipy.integrate import quad
from scipy.optimize import brentq
from scipy.interpolate import interp2d
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm

def BinomialLeisenReimer(S0, K, r, T, N, sigma, div):

    is_call = False

    odd_N = N if (N % 2 == 1) else (N + 1)

    d1 = (math.log(S0 / K) + ((r - div) + (sigma ** 2) / 2.) * T) / (sigma * math.sqrt(T))

    d2 = (math.log(S0 / K) + ((r - div) - (sigma ** 2) / 2.) * T) / (sigma * math.sqrt(T))

    dt = T / odd_N
    r_n = math.exp((r - div) * dt)
    method = 2.
    Term1 = math.pow((d1 / (N + 1.0 / 3.0 - (1 - method) * 0.1 / (N + 1))), 2) * (N + 1.0 / 6.0)
    pp = 0.5 + math.copysign(1, d1) * 0.5 * math.sqrt(1 - math.exp(-Term1))

    Term2 = math.pow((d2 / (N + 1.0 / 3.0 - (1 - method) * 0.1 / (N + 1))), 2) * (N + 1.0 / 6.0)
    p = 0.5 + math.copysign(1, d2) * 0.5 * math.sqrt(1 - math.exp(-Term2))

    u = r_n * pp / p
    d = (r_n - p * u) / (1 - p)

    qu = p
    qd = 1 - p

    #Initialize price tree

    STs = [np.array([S0])]
    # Simulate the possible stock prices path
    for i in range(N):
        prev_branches = STs[-1]
        st = np.concatenate((prev_branches * u, [prev_branches[-1] * d]))
        STs.append(st)  # Add nodes at each time step

    #Initialize payoff tree
    payoffs = np.maximum(0, (STs[N]-K) if is_call else (K-STs[N]))

    def check_early_exercise(payoffs, node):
        early_ex_payoff = (STs[node] - K) if is_call else (K - STs[node])
        return np.maximum(payoffs, early_ex_payoff)

    #Begin tree traversal
    for i in reversed(range(N)):
        # The payoffs from NOT exercising the option
        payoffs = (payoffs[:-1] * qu + payoffs[1:] * qd) * df
        # Payoffs from exercising, for American options
        early_ex_payoff = (STs[i] - K) if is_call else (K - STs[i])
        payoffs = np.maximum(payoffs, early_ex_payoff)

    return payoffs[0]

headers = ['Date','Strike','Call_Bid','Call_Ask','Call','Maturity','Put_Bid','Put_Ask','Put']
dtypes = {'Date': 'str', 'Strike': 'float', 'Call_Bid': 'float', 'Call_Ask': 'float',
parse_dates = ['Date', 'Maturity']
data = pd.read_csv('AAPL_BBG_vols.csv',skiprows=1,header=None, names=headers, dtype=dtypes, parse_dates=parse_dates)
data['LR_Imp_Vol'] = 0.0
data['TTM'] = 0.0
r = 0.0248 #risk-free rate
S0 = 153.97 # spot price as of 5/11/2017
div = 0.0182
for i in data.index:
    t = data['Date'][i]
    T = data['Maturity'][i]
    K = data['Strike'][i]
    Put = data['Put'][i]
    sigma_guess = 0.2
    time_to_maturity = (T - t).days/365.0
    data.loc[i, 'TTM'] = time_to_maturity
    def func_LR(sigma):
        return BinomialLeisenReimer(S0, K, r, time_to_maturity, ((T - t).days) - 1, sigma, div) - Put

    lr_imp_vol = brentq(func_LR, 0.03, 1.0)
    data.loc[i, 'LR_Imp_Vol'] = lr_imp_vol

ttm = data['TTM'].tolist()
strikes = data['Strike'].tolist()
imp_vol = data['LR_Imp_Vol'].tolist()
f = interp2d(strikes,ttm,imp_vol, kind='cubic')

plot_strikes = np.linspace(data['Strike'].min(), data['Strike'].max(),25)
plot_ttm = np.linspace(0, data['TTM'].max(), 25)
fig = plt.figure()
ax = fig.gca(projection='3d')
X, Y = np.meshgrid(plot_strikes, plot_ttm)
Z = np.array([f(x,y) for xr, yr in zip(X, Y) for x, y in zip(xr,yr) ]).reshape(len(X), len(X[0]))
surf = ax.plot_surface(X, Y, Z, rstride=1, cstride=1, cmap=cm.coolwarm,linewidth=0.1)
ax.set_zlabel('Implied Volatility')
fig.colorbar(surf, shrink=0.5, aspect=5)

Options and Volatility Smile

An equity option represents the right to buy (“call” option) or sell (“put” option) a unit of underlying stock at a pre-specified price (strike) at a predetermined maturity date (European option) or at any time up to the predetermined date (American option).

Option writer sells an option and option holder buys an option.

For a European call option on an index with strike 8,000 and index level of 8200 at maturity, the option holder receives the difference of $200 from option writer. This is called the instrinsic value or payoff of the option from the holder’s point of view.

The payoff function for a call option is $$ h_{T}(S,K) = max[S_{T}-K, 0] \tag{Eq. 1}$$

where T = maturity date, $\ S_T $ is the index level at maturity and K is the strike price.

In-the-money: a call (put) is in-the-money when S > K (S < K)
At-the-money: call or put is at-the-money when $\ S \approx K $
Out-of-the-money: a call is out-of-the-money when S < K (S > K)

A fair present value (is different than payoff) of a European call option is given by Black-Scholes formula:

$\ C_{0}^{*} = C^{BSM}(S_{0},K,T,r,\sigma) \tag{Eq. 2}$

$\S_{0} $ current index level (spot)
K strike price of the option
T time-to-maturity of the option
r risk-free short rate
$\ \sigma $ volatility or the std dev of the index returns

$\ C^{BSM} = S_{t} . N(d_{1}) – e^{r(T-t)} . K. N(d_{2})\tag{Eq. 3} $


$\displaystyle N(d) = \frac{1}{\sqrt{2\pi}} \int_{-\infty}^{d} e^{-\frac{1}{2}x^{2}} dx$

$\displaystyle d1 = \frac{\log\frac{S_{t}}{K} + (r + \frac{\sigma^2}{2})(T-t)}{\sigma\sqrt{T-t}}$

$\displaystyle d2 = \frac{\log\frac{S_{t}}{K} + (r – \frac{\sigma^2}{2})(T-t)}{\sigma\sqrt{T-t}}$


# BSM option valuation
import math
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from scipy.integrate import quad
from scipy.optimize import brentq
from scipy.interpolate import interp2d
from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
## Helper functions ##
def dN(x):
    ''' Probability density function of standard normal random variable x.'''
    return math.exp(-0.5 * x ** 2) / math.sqrt(2 * math.pi)

def N(d):
    ''' Cumulative density function of standard normal random variable x. '''
    return quad(lambda x: dN(x), -20, d, limit=50)[0]

def d1f(St, K, t, T, r, sigma):
    ''' Black-Scholes-Merton d1 function.
        Parameters see e.g. BSM_call_value function. '''
    d1 = (math.log(St / K) + (r + 0.5 * sigma ** 2)
        * (T - t)) / (sigma * math.sqrt(T - t))
    return d1

def BSM_call_value(St, K, t, T, r, sigma):
    ''' Calculates Black-Scholes-Merton European call option value
    St: float
    stock/index level at time t
    K: float
    strike price
    t: float
    valuation date
    T: float
    date of maturity/time-to-maturity if t = 0; T > t
    r: float
    constant, risk-less short rate
    sigma: float
    call_value: float
    European call present value at t
    d1 = d1f(St, K, t, T, r, sigma)
    d2 = d1 - sigma * math.sqrt(T - t)
    call_value = St * N(d1) - math.exp(-r * (T - t)) * K * N(d2)
    return call_value

def BSM_put_value(St, K, t, T, r, sigma):
    ''' Calculates Black-Scholes-Merton European put option value.
    St: float
    stock/index level at time t
    K: float
    strike price
    t: float
    valuation date
    T: float
    date of maturity/time-to-maturity if t = 0; T > t
    r: float
    constant, risk-less short rate
    sigma: float
    put_value: float
    European put present value at t
    put_value = BSM_call_value(St, K, t, T, r, sigma) - St + math.exp(-r * (T - t)) * K
    return put_value
# Test Option Payoff and Time value
K = 8000 # Strike price
T = 1.0 # time-to-maturity
r = 0.025 # constant, risk-free rate
vol = 0.2 # constant volatility

# Generate spot prices 
S = np.linspace(4000, 12000, 150)
h = np.maximum(S - K, 0) # payoff of the option
C = [BSM_call_value(Szero, K, 0, T, r, vol) for Szero in S] #BS call option values

plt.plot(S, h, 'b-.', lw=2.5, label='payoff') # plot inner value at maturity
plt.plot(S, C, 'r', lw=2.5, label='present value') # plot option present value
plt.xlabel('index level $S_0$')
plt.ylabel('present value $C(t=0)$')

The present value of the option is always higher than the undiscounted payoff, the difference being the time value. In other words, the option’s present value is composed of payoff plus the time value. Time value indicates that there is always a chance of option going in-the-money or more in-the-money during that time.

Simulating Returns

The Geometric Brownian motion model of the BS equation is given by

$$\displaystyle dS_{t} = rS_{t}dt + \sigma S_{t} dt dZ_{t}\tag{Eq.4}$$

The discretized version is

$$\displaystyle S_{t} = S_{t – \Delta t} e^{(r – \frac{1}{2}\sigma^2) \Delta t + \sigma \sqrt{\Delta t} z_{t}}\tag{Eq.5}$$

where t $\in {(\Delta t, 2\Delta t,…..,T)}$

Using the above discretized version, we will simulate the spot prices with $S_{0}$=100, T=10, r = 0.05 and $\sigma$=0.2

import pandas as pd

# model parameters
S0 = 100.0 # initial index level
T = 10.0 # time horizon
r = 0.05 # risk-less short rate
vol = 0.2 # instantaneous volatility
# simulation parameters
#generate a pd array with business dates, ignores holidays
gbm_dates = pd.DatetimeIndex(start='10-05-2007',end='10-05-2017',freq='B')
M = len(gbm_dates) # time steps
I = 1 # index level paths
dt = 1 / 252. # 252 business days a year
df = math.exp(-r * dt) # discount factor

# stock price paths
rand = np.random.standard_normal((M, I)) # random numbers
S = np.zeros_like(rand) # stock matrix
S[0] = S0 # initial values
for t in range(1, M): # stock price paths using Eq.5
    S[t] = S[t - 1] * np.exp((r - vol ** 2 / 2) * dt + vol * rand[t] * math.sqrt(dt))
#create a pd dataframe with date as index and a column named "spot"
gbm = pd.DataFrame(S[:, 0], index=gbm_dates, columns=['spot']) 
gbm['returns'] = np.log(gbm['spot'] / gbm['spot'].shift(1)) #log returns
# Realized Volatility 
gbm['realized_var'] = 252 * np.cumsum(gbm['returns'] ** 2) / np.arange(len(gbm))
gbm['realized_vol'] = np.sqrt(gbm['realized_var'])
print gbm.head()
gbm = gbm.dropna()
               spot   returns realized_var realized_vol 
2007-10-08 98.904295 -0.011018 0.030589    0.174898
2007-10-09 98.444867 -0.004656 0.018026    0.134261
2007-10-10 97.696364 -0.007632 0.016911    0.130041
2007-10-11 98.280594 0.005962  0.014922    0.122158
plt.figure(figsize=(9, 6))
plt.ylabel('daily quotes')
plt.ylabel('daily log returns')

Implied Volatility is the value of $\sigma$ that solves Eq. 2 given the option market quote $C_{0}^{*}$

Volatility surface is the plot of the implied volatilities for different option strikes and different option maturities on the same underlying (an option chain).

Vol Surfaces exhibit :
Smiles: option implied volatilities exhibit a smile form, i.e. for calls the OTM implied volatilities are higher than the ATM ones; sometimes they rise again for ITM options
term structure:smiles are more pronounced for short-term options than for longer-term options; a phenomenon sometimes called volatility term structure

To demonstrate Vol Surface, I will use an option chain on AAPL stock as of 5/11/2017. I have downloaded this data from a reputable vendor, you can find this file here AAPL_BBG_vols

headers = ['Date','Strike','Call_Bid','Call_Ask','Call','Maturity','Put_Bid','Put_Ask','Put']
dtypes = {'Date': 'str', 'Strike': 'float', 'Call_Bid': 'float', 'Call_Ask': 'float',
parse_dates = ['Date', 'Maturity']

data = pd.read_csv('AAPL_BBG_vols.csv',skiprows=1,header=None, names=headers, dtype=dtypes, parse_dates=parse_dates)
data['BS_Imp_Vol'] = 0.0
data['TTM'] = 0.0
r = 0.0248 #risk-free rate
S0 = 153.97 # spot price as of 5/11/2017
div = 0.0182
for i in data.index:
    t = data['Date'][i]
    T = data['Maturity'][i]
    K = data['Strike'][i]
    Put = data['Put'][i]
    time_to_maturity = (T - t).days/365.0
    data.loc[i, 'TTM'] = time_to_maturity
    def func_BS(sigma):
        return BSM_put_value(S0, K, 0, time_to_maturity, r, sigma) - Put

    bs_imp_vol = brentq(func_BS, 0.03, 1.0)
    data.loc[i,'BS_Imp_Vol'] = bs_imp_vol

Once we have the implied volatilities, we will generate a grid of strikes and maturities and use Cubic interpolation to derive the missing implied volatilities needed for a smooth surface.

ttm = data['TTM'].tolist()
strikes = data['Strike'].tolist()
imp_vol = data['BS_Imp_Vol'].tolist()
f = interp2d(strikes,ttm,imp_vol, kind='cubic')

plot_strikes = np.linspace(data['Strike'].min(), data['Strike'].max(),25)
plot_ttm = np.linspace(0, data['TTM'].max(), 25)
fig = plt.figure()
ax = fig.gca(projection='3d')
X, Y = np.meshgrid(plot_strikes, plot_ttm)
Z = np.array([f(x,y) for xr, yr in zip(X, Y) for x, y in zip(xr,yr) ]).reshape(len(X), len(X[0]))
surf = ax.plot_surface(X, Y, Z, rstride=1, cstride=1, cmap=cm.coolwarm,linewidth=0.1)
ax.set_zlabel('Implied Volatility')
fig.colorbar(surf, shrink=0.5, aspect=5)

Valar Dohaeris!

Here is my first post, just a quick one really. I had these ideas and code I have been playing around with for a while and wanted to put that into something concrete. I plan on posting a few things I have learned on the job or self-learned, things that I find interesting or otherwise. Stay tuned.