Return to Blog
By John Lekberg on December 04, 2020.
NOTE: Oscar Martinez pointed out that I omitted the definition of theNegativeCycleError
exception. I have fixed this. Thanks, Oscar!
This week's Python blog post is about the "Shortest Path" problem, which is a graph theory problem that has many applications,including finding arbitrage opportunities and planning travel between locations.
You will learn:
- How to solve the "Shortest Path" problem using a brute force solution.
- How to use the Bellman-Ford algorithm to create a more efficient solution.
- How to handle situations with no shortest path -- including negative cycles.
- How to apply your "shortest path solvers" (1) to plan a trip from Paris toRome, and (2) to identify an arbitrage opportunity on a currency exchange.
As input, you are given:
- A weighted, directed graph.
- A "start" vertex and an "end" vertex.
Your goal is to find the shortest path (minimizing path weight) from "start"to "end". However, no shortest path may exist. Perhaps the graph has a cyclewith negative weight, and thus you can repeatedly traverse the cycle to make thepath shorter and shorter.
The goal is to find the path that "minimizes the sum of edge weights."Be aware of these two variations:
- "Maximizes the sum of edge weights." To deal with this variation, multiplyall edge weights by -1 and then "minimize the sum of negative edge weights."
- "Minimizes the product of edge weights." As long as all edge weights arepositive, deal with this variation by taking the logarithm of all edgeweights and then "minimize the sum of logarithms of edge weights."
To demonstrate this problem, I'll use two examples: (1) finding the fastest tripbetween cities and (2) finding arbitrage opportunities in a currency exchange.See sections "scenario: trip scheduling" and "scenario: currency exchange" belowfor more details.
For the input, consider this directed, weighted graph:
I represent this graph using this Python expression:
example_graph = { "A": { "B": 1, }, "B": { "A": 3, "C": 0, }, "C": { "A": -1, },}
To enumerate all vertices in the graph, I use the helper functionall_vertices
:
def all_vertices(graph): """Return a set of all vertices in a graph. graph -- a weighted, directed graph. """ vertices = set() for v in graph.keys(): vertices.add(v) for u in graph[v].keys(): vertices.add(u) return verticesall_vertices(example_graph)
{'A', 'B', 'C'}
To check if an edge exists, I use this helper function is_edge
:
def is_edge(graph, tail, head): """Return if the edge (tail)->(head) is present in a graph. graph -- a weighted, directed graph. tail -- a vertex. head -- a vertex. """ return (tail in graph) and (head in graph[tail])is_edge(example_graph, "A", "B")
True
is_edge(example_graph, "A", "C")
False
To find the weight of an edge, I simply index the dictionary:
example_graph["A"]["B"]
1
example_graph["C"]["A"]
-1
For the output, if a shortest path exists, then I represent the solution asa tuple of:
- The path weight.
- The path vertices.
For example, finding the shortest path from "B" to "A" in the above graph, Irepresent the solution as
-1, ["B", "C", "A"]
(-1, ['B', 'C', 'A'])
If no shortest path exists, then I will raise a custom NoShortestPathError
exception:
class NoShortestPathError(Exception): passraise NoShortestPathError
NoShortestPathError:
I'm also interested in scenarios when no shortest path exists because of anegative cycle, so I use another custom exception for that:
class NegativeCycleError(NoShortestPathError): def __init__(self, weight, cycle): self.weight = weight self.cycle = cycle def __str__(self): return f"Weight {self.weight}: {self.cycle}"raise NegativeCycleError(-3, ["A", "B", "A"])
NegativeCycleError: Weight -3: ['A', 'B', 'A']
I use an exception instead of returning an error code because I want to makesure I correctly handle scenarios with no shortest path. If I return an errorcode, I can ignore it. But a NoShortestPathError
exception must be handledwith a try-statement, otherwise the exception will propagate until thePython interpreter terminates.
In this scenario, my goal is to plan a trip from Paris to Rome.I have a table of train times between various cities.
from datetime import timedeltaexample_travel_times = { ("Paris", "Zurich"): timedelta(hours=6, minutes=12), ("Paris", "Genoa"): timedelta(hours=9, minutes=26), ("Paris", "Milan"): timedelta(hours=8, minutes=55), ("Zurich", "Milan"): timedelta(hours=3, minutes=12), ("Milan", "Genoa"): timedelta(hours=1, minutes=48), ("Milan", "Rome"): timedelta(hours=5, minutes=45), ("Genoa", "Rome"): timedelta(hours=5, minutes=8),}
So I write a function, plan_trip
, that will utilize a shortest path algorithmto plan the trip:
from collections import defaultdictdef plan_trip(*, travel_times, shortest_path_solver, start, end): """Plan the fastest trip between two locations. travel_times -- dict. { (cityA, cityB): travel_time, ... }. shortest_path_solver -- a function that solves the "Shortest Path" problem. start -- starting location. end -- ending location. """ hour = timedelta(hours=1) graph = defaultdict(dict) for (a, b), travel_time in travel_times.items(): weight = travel_time / hour graph[a][b] = weight graph[b][a] = weight try: best_weight, best_path = shortest_path_solver( graph=graph, start=start, end=end, ) except NoShortestPathError: print(f"No possible trip from {start} to {end}.") else: best_time = best_weight * hour print(f"The fastest trip from {start} to {end} takes {best_time}:") for a, b in zip(best_path, best_path[1:]): edge_time = graph[a][b] * hour print(f"- {a} to {b}, {edge_time}")
(You will see this function in use in the sections "creating a brute forcesolution" and "creating a more efficient solution with the Bellman-Fordalgorithm.")
In this scenario, my goal is to convert one troy ounce of gold (XAU)into as much USD as possible. I have a table of exchange rates betweendifferent currencies:
from decimal import Decimalexample_exchange_rates = { ("XAU", "CAD"): Decimal("2_323.45"), ("XAU", "USD"): Decimal("1_772.84"), ("XAU", "MXN"): Decimal("35_560.57"), ("CAD", "USD"): Decimal("0.77"), ("CAD", "MXN"): Decimal("15.44"), ("USD", "MXN"): Decimal("20.05"),}
(This table uses ISO 4217 currency codes.)
So I write a function, maximize_profit
, that will utilize a shortest pathalgorithm to maximize my profit:
from collections import defaultdictdef maximize_profit( *, exchange_rates, shortest_path_solver, start, end): """Maximize the profit from exchanging currencies, starting and ending on a given currency. If a negative cycle is found, then the arbitrage opportunity is reported. exchange_rates -- dict. { (base, quote): rate, ... }. shortest_path_solver -- a function that solves the "Shortest Path" problem. start -- starting currency. end -- ending currency. """ graph = defaultdict(dict) for (base, quote), rate in exchange_rates.items(): weight = -rate.ln() graph[base][quote] = weight graph[quote][base] = -weight try: best_weight, best_path = shortest_path_solver( graph=graph, start=start, end=end ) except NegativeCycleError as error: cycle = error.cycle cycle_weight = error.weight cycle_rate = (-cycle_weight).exp() print("Arbitrage opportunity:") print(f"> {cycle[0]}/{cycle[-1]} {cycle_rate}") print("Via:") for (base, quote) in zip(cycle, cycle[1:]): edge_weight = graph[base][quote] edge_rate = (-edge_weight).exp() print(f"- {base}/{quote} {edge_rate}") except NoShortestPathError: print(f"No way to exchange {start} into {end}") else: best_weight = Decimal(best_weight) best_rate = (-best_weight).exp() print("Optimal exchange:") print(f"> {start}/{end} {best_rate}") print("Via:") for (base, quote) in zip(best_path, best_path[1:]): edge_weight = graph[base][quote] edge_rate = (-edge_weight).exp() print(f"- {base}/{quote} {edge_rate}")
(You will see this function in use in the sections "creating a brute forcesolution" and "creating a more efficient solution with the Bellman-Fordalgorithm.")
The formula I use to convert the exchange rate into an edge weight is:
weight = -log(rate)
Because the shortest path solver "minimizes the sum of edge weights", and thecurrency exchange wants to "maximize the product of exchange rates". (Recallthe variations mentioned in the problem statement.)
My brute force solution has two steps:
- Check for negative cycles by checking all possible cycles.
- If any negative cycles exist, raise a
NegativeCycleError
exception.
- If any negative cycles exist, raise a
- Check for candidate paths by checking all possible paths.
- If no candidate paths exist, raise a
NoShortestPathError
exception. - If any candidate paths exist, take the minimum weight path.
- If no candidate paths exist, raise a
Here's a Python implementation of this:
from itertools import permutationsdef shortest_path_bf(*, graph, start, end): """Find the shortest path from start to end in graph, using brute force. If a negative cycle exists, raise NegativeCycleError. If no shortest path exists, raise NoShortestPathError. """ V = tuple(all_vertices(graph)) n = len(V) def path_weight(path): return sum( graph[tail][head] for (tail, head) in zip(path, path[1:]) ) all_paths = [ path for path_n in range(1, n + 1) for path in permutations(V, path_n) if all( is_edge(graph, tail, head) for (tail, head) in zip(path, path[1:]) ) ] # Check for negative cycles. all_cycles = ( (*path, path[0]) for path in all_paths if is_edge(graph, path[-1], path[0]) ) for cycle in all_cycles: weight = path_weight(cycle) if weight < 0: raise NegativeCycleError(weight, cycle) # Check candidate paths candidate_paths = [ path for path in all_paths if path[0] == start if path[-1] == end ] if len(candidate_paths) == 0: raise NoShortestPathError else: return min( (path_weight(path), path) for path in candidate_paths )shortest_path_bf(graph=example_graph, start="B", end="A")
(-1, ('B', 'C', 'A'))
plan_trip( travel_times=example_travel_times, shortest_path_solver=shortest_path_bf, start="Paris", end="Rome",)
The fastest trip from Paris to Rome takes 14:34:00:- Paris to Genoa, 9:26:00- Genoa to Rome, 5:08:00
maximize_profit( exchange_rates=example_exchange_rates, shortest_path_solver=shortest_path_bf, start="XAU", end="USD",)
Arbitrage opportunity:> CAD/CAD 1.009147187563457503215180163Via:- CAD/USD 0.7700000000000000000000000000- USD/XAU 0.0005640666952460458924663252184- XAU/CAD 2323.450000000000000000000001
What's the time complexity of this solution? For a graph with nvertices:
There are
P(n,1) +P(n,2) + ... + P(n,n)
possible paths (same for possible cycles).If I relax this slightly, I can simplify this to
Computing the weight of path of k nodes, takes O(k) time.Since k≤n, I will simplify this to
O(n)
As a result, the overall time complexity of this solution is
O(n2×n!)
The key idea of the Bellman-Ford algorithm is to overestimate the pathweights and repeatedly relax the estimates.Each time the relaxation happens, the number of correct path weights increases.And -- for a graph with n vertices -- doing the relaxation processn-1 times will ensure that all vertices have correctly computed pathweights.
Here's a Python implementation of this:
def shortest_path_bellman_ford(*, graph, start, end): """Find the shortest path from start to end in graph, using the Bellman-Ford algorithm. If a negative cycle exists, raise NegativeCycleError. If no shortest path exists, raise NoShortestPathError. """ n = len(all_vertices(graph)) dist = {} pred = {} def is_dist_infinite(v): return v not in dist def walk_pred(start, end): path = [start] v = start while v != end: v = pred[v] path.append(v) path.reverse() return path def find_cycle(start): nodes = [] node = start while True: if node in nodes: cycle = [ node, *reversed(nodes[nodes.index(node) :]), ] print(nodes) print(cycle) return cycle nodes.append(node) if node not in pred: break node = pred[node] dist[start] = 0 # Relax approximations (n-1) times. for _ in range(n - 1): for tail in graph.keys(): if is_dist_infinite(tail): continue for head, weight in graph[tail].items(): alt = dist[tail] + weight if is_dist_infinite(head) or ( alt < dist[head] ): dist[head] = alt pred[head] = tail # Check for negative cycles. for tail in graph.keys(): for head, weight in graph[tail].items(): if (dist[tail] + weight) < dist[head]: cycle = find_cycle(tail) cycle_weight = sum( graph[c_tail][c_head] for (c_tail, c_head) in zip( cycle, cycle[1:] ) ) raise NegativeCycleError( cycle_weight, cycle ) # Build shortest path. if is_dist_infinite(end): raise NoShortestPathError best_weight = dist[end] best_path = walk_pred(end, start) return best_weight, best_pathshortest_path_bellman_ford( graph=example_graph, start="B", end="A")
(-1, ['B', 'C', 'A'])
plan_trip( travel_times=example_travel_times, shortest_path_solver=shortest_path_bellman_ford, start="Paris", end="Rome",)
The fastest trip from Paris to Rome takes 14:34:00:- Paris to Genoa, 9:26:00- Genoa to Rome, 5:08:00
maximize_profit( exchange_rates=example_exchange_rates, shortest_path_solver=shortest_path_bellman_ford, start="XAU", end="USD",)
Arbitrage opportunity:> XAU/XAU 1.009245235999597360471702672Via:- XAU/CAD 2323.450000000000000000000001- CAD/MXN 15.43999999999999999999999999- MXN/USD 0.04987531172069825436408977557- USD/XAU 0.0005640666952460458924663252184
What's the time complexity of the Bellman-Ford algorithm? For a graphwith n vertices and m edges:
- Counting the number of vertices takes O(n) time.
- The relaxation process takes O(m×n) time.
- Checking for negative cycles takes O(m) time.
As a result, the overall time complexity of the Bellman-Ford algorithm is
O(m×n)
In this week's post, you learned how to solve the "Shortest Path" problem usinga brute force solution, as well as the Bellman-Ford algorithm. You alsolearned how to identify and properly respond to situations with no shortestpath -- including handling negative cycles. You used your "shortest pathsolvers" to plan a trip from Paris to Rome and to identify an arbitrageopportunity on a currency exchange.
My challenge to you:
Gather your own data for the trip planning and currency exchange scenarios.
What results do
plan_trip
andmaximize_profit
give for your data?
If you enjoyed this week's post, share it with your friends and stay tuned fornext week's post. See you then!
(If you spot any errors or typos on this post, contact me via mycontact page.)