# Pyro: Deep Universal Probabilistic Programming
Gonzalo Rios (grios@dim.uchile.cl)

Pyro is a universal probabilistic programming language (PPL) written in Python and supported by PyTorch on the backend. Pyro enables flexible and expressive deep probabilistic modeling, unifying the best of modern deep learning and Bayesian modeling. It was designed with these key principles:

* Universal: Pyro can represent any computable probability distribution.
* Scalable: Pyro scales to large data sets with little overhead.
* Minimal: Pyro is implemented with a small core of powerful, composable abstractions.
* Flexible: Pyro aims for automation when you want it, control when you need it. 

http://pyro.ai/


# Pyro vs Numpy/Scipy

In [None]:
import numpy as np
import scipy.stats as stats

In [None]:
loc = 0. # mean zero
scale = 1. # unit variance
size = int(1e8)

In [None]:
x_numpy = loc+scale*np.random.randn(size)

In [None]:
x_scipy = stats.norm.rvs(loc=loc, scale=scale, size=size)

In [None]:
logp_scipy = stats.norm.logpdf(x_scipy, loc=loc, scale=scale)

In [None]:
import torch
import pyro
import pyro.distributions as dist

In [None]:
normal = dist.Normal(loc, scale) # create a normal distribution object
x = normal.sample() # draw a sample from N(0,1)
print("sample", x)
print("log prob", normal.log_prob(x)) # score the sample from N(0,1)

In [None]:
x_pyro = normal.sample((size,))

In [None]:
logp_pyro = normal.log_prob(x_pyro)

In [None]:
cuda = True
device = torch.device("cuda") if torch.cuda.is_available() and cuda else torch.device("cpu")
device

In [None]:
normal = dist.Normal(torch.Tensor([loc]).to(device), torch.Tensor([scale]).to(device))

In [None]:
x_pyro = normal.sample((size,))

In [None]:
logp_pyro = normal.log_prob(x_pyro)

# Models

In [None]:
x = pyro.sample("my_sample", dist.Normal(loc, scale))
print(x)

In [None]:
def weather():
 cloudy = pyro.sample('cloudy', dist.Bernoulli(0.3))
 cloudy = 'cloudy' if cloudy.item() == 1.0 else 'sunny'
 mean_temp = {'cloudy': 55.0, 'sunny': 75.0}[cloudy]
 scale_temp = {'cloudy': 10.0, 'sunny': 15.0}[cloudy]
 temp = pyro.sample('temp', dist.Normal(mean_temp, scale_temp))
 return cloudy, temp

In [None]:
for _ in range(10):
 print(weather())

In [None]:
def ice_cream_sales():
 cloudy, temp = weather()
 expected_sales = 200. if cloudy == 'sunny' and temp > 80.0 else 50.
 ice_cream = pyro.sample('ice_cream', dist.Normal(expected_sales, 10.0))
 return ice_cream.item()

In [None]:
for _ in range(10):
 print(ice_cream_sales())

In [None]:
import seaborn as sb
sb.distplot(np.array([ice_cream_sales() for i in range(1000)]), bins=100)

In [None]:
sb.countplot(np.array([weather()[0] for i in range(1000)]))

In [None]:
sb.distplot(np.array([weather()[1].item() for i in range(1000)]), bins=100)

In [None]:
def geometric(p, t=None):
 if t is None:
 t = 0
 x = pyro.sample("x_{}".format(t), dist.Bernoulli(p))
 if x.item() == 0:
 return x
 else:
 return x + geometric(p, t + 1)


In [None]:
sb.distplot(np.array([geometric(0.1).item() for i in range(1000)]), kde=False)

In [None]:
sb.distplot(np.array([geometric(0.5).item() for i in range(1000)]), kde=False)

In [None]:
sb.distplot(np.array([geometric(0.9).item() for i in range(1000)]), kde=False)

In [None]:
def normal_product(loc, scale):
 z1 = pyro.sample("z1", dist.Normal(loc, scale))
 z2 = pyro.sample("z2", dist.Normal(loc, scale))
 y = z1 * z2
 return y

def make_normal_normal():
 mu_latent = pyro.sample("mu_latent", dist.Normal(0, 1))
 fn = lambda scale: normal_product(mu_latent, scale)
 return fn

In [None]:
sb.distplot(np.array([make_normal_normal()(0.2).item() for i in range(100)]))

# Inference

In [None]:
true_coeffs = torch.tensor([-1., 2., 4.])
data = torch.randn(2000, 3)
labels = dist.Bernoulli(logits=(true_coeffs * data).sum(-1)).sample()

In [None]:
def model(inputs, obs):
 coefs = pyro.sample('beta', dist.Normal(torch.zeros(3), torch.ones(3)))
 y = pyro.sample('y', dist.Bernoulli(logits=(coefs * inputs).sum(-1)), obs=obs)
 return y

In [None]:
from pyro.infer import Importance, EmpiricalMarginal

import_run = Importance(model, num_samples=10000).run(data, labels)
import_run

In [None]:
posterior = pyro.infer.EmpiricalMarginal(import_run, 'beta')
posterior

In [None]:
sample = posterior.sample((10,)).numpy()
sb.distplot(sample[:,0])
sb.distplot(sample[:,1])
sb.distplot(sample[:,2])
sample.mean(axis=0)

In [None]:
from pyro.infer import mcmc

hmc_kernel = mcmc.HMC(model, step_size=0.0855, num_steps=4)
mcmc_run = mcmc.MCMC(hmc_kernel, num_samples=1000, warmup_steps=500).run(data, labels)
mcmc_run

In [None]:
posterior = pyro.infer.EmpiricalMarginal(mcmc_run, 'beta')
sample = posterior.sample((1000,)).numpy()
sb.distplot(sample[:,0])
sb.distplot(sample[:,1])
sb.distplot(sample[:,2])
posterior.mean

# Trace

In [None]:
N = 200 # size of toy data

def build_linear_dataset(N, p=1, w=3, b=1, noise_std=0.01):
 X = np.random.rand(N, p)
 w = w * np.ones(p)
 y = np.matmul(X, w) + np.repeat(b, N) + np.random.normal(0, noise_std, size=N)
 y = y.reshape(N, 1)
 X, y = torch.tensor(X).type(torch.Tensor), torch.tensor(y).type(torch.Tensor)
 data = torch.cat((X, y), 1)
 assert data.shape == (N, p + 1)
 return data

data = build_linear_dataset(N, p=1, w=2.5, b=-1.3, noise_std=0.5)

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.plot(data[:,0].numpy(), data[:,1].numpy(), '*')

In [None]:
import torch.nn as nn

class RegressionModel(nn.Module):
 def __init__(self, p):
 # p = number of features
 super(RegressionModel, self).__init__()
 self.linear = nn.Linear(p, 1)

 def forward(self, x):
 return self.linear(x)

regression_model = RegressionModel(1)
regression_model

In [None]:
for p in list(regression_model.named_parameters()):
 print(p[0], p[1].shape)

In [None]:
def model(data):
 
 # Create unit dist.Normal priors over the parameters
 zero = torch.zeros(1, 1)
 ten = 10 * torch.ones(1, 1)
 w_prior = dist.Normal(zero, ten)
 b_prior = dist.Normal(zero, ten)
 priors = {'linear.weight': w_prior, 'linear.bias': b_prior}
 noise = 0.1 * torch.ones(data.size(0))
 
 # lift module parameters to random variables sampled from the priors
 lifted_module = pyro.random_module("module", regression_model, priors)
 # sample a regressor (which also samples w and b)
 lifted_reg_model = lifted_module()
 with pyro.iarange("map", N):
 x_data = data[:, :-1]
 y_data = data[:, -1]

 # run the regressor forward conditioned on data
 prediction_mean = lifted_reg_model(x_data).squeeze(-1)
 # condition on the observed data
 pyro.sample("obs",
 dist.Normal(prediction_mean, 0.1 * torch.ones(data.size(0))),
 obs=y_data)

In [None]:
from pyro import poutine

model_trace = poutine.trace(model).get_trace(data)
model_trace

In [None]:
model_trace.stochastic_nodes

In [None]:
model_trace.nodes

In [None]:
model_trace.nodes['_INPUT']

In [None]:
model_trace.nodes['_RETURN']

In [None]:
model_trace.nodes['obs']

In [None]:
model_trace.nodes['module$$$linear.weight']

In [None]:
model_trace.compute_log_prob()
model_trace.log_prob_sum()

In [None]:
model_trace.nodes['module$$$linear.weight']

In [None]:
model_trace.nodes['obs']

# Stochastic Variational Inference

In [None]:
softplus = torch.nn.Softplus()

def guide(data):
 # define our variational parameters
 w_loc = torch.randn(1, 1)
 # note that we initialize our scales to be pretty narrow
 w_log_sig = torch.tensor(-3.0 * torch.ones(1, 1) + 0.05 * torch.randn(1, 1))
 b_loc = torch.randn(1)
 b_log_sig = torch.tensor(-3.0 * torch.ones(1) + 0.05 * torch.randn(1))
 # register learnable params in the param store
 mw_param = pyro.param("guide_mean_weight", w_loc)
 sw_param = softplus(pyro.param("guide_log_scale_weight", w_log_sig))
 
 mb_param = pyro.param("guide_mean_bias", b_loc)
 sb_param = softplus(pyro.param("guide_log_scale_bias", b_log_sig))
 # guide distributions for w and b
 w_dist = dist.Normal(mw_param, sw_param).independent(1)
 b_dist = dist.Normal(mb_param, sb_param).independent(1)
 
 dists = {'linear.weight': w_dist, 'linear.bias': b_dist}
 # overload the parameters in the module with random samples
 # from the guide distributions
 lifted_module = pyro.random_module("module", regression_model, dists)
 # sample a regressor (which also samples w and b)
 return lifted_module()

In [None]:
from pyro.optim import Adam
from pyro.infer import SVI, Trace_ELBO

In [None]:
optim = Adam({"lr": 0.05})
svi = SVI(model, guide, optim, loss=Trace_ELBO())

In [None]:
losses = []

def main(num_iterations = 1000):
 pyro.clear_param_store()
 for j in range(num_iterations):
 # calculate the loss and take a gradient step
 loss = svi.step(data)
 losses.append(loss)
 if j % 100 == 0:
 print("[iteration %04d] loss: %.4f" % (j, loss / float(N)))

In [None]:
main()

In [None]:
plt.plot(losses)

In [None]:
plt.plot(losses[100:])

In [None]:
for name in pyro.get_param_store().get_all_param_names():
 print("[%s]: %.3f" % (name, pyro.param(name).data.numpy()))

In [None]:
pred = 0 
for i in range(100):
 # guide does not require the data
 sampled_reg_model = guide(None)
 # run the regression model and add prediction to total
 pred += sampled_reg_model(data[:,:-1])
# take the average of the predictions
pred /= 100
obs = data[:,-1:]

In [None]:
plt.figure(figsize=(20,10))
plt.plot(obs.detach().numpy()[:200])
plt.plot(pred.detach().numpy()[:200])

In [None]:
loss = nn.MSELoss()
loss(pred, obs)

# Gaussian Processes

In [None]:
import matplotlib.pyplot as plt
import torch
import pyro
import pyro.contrib.gp as gp
import pyro.distributions as dist
from pyro.infer import SVI, Trace_ELBO
from pyro.optim import Adam

def plot(plot_observed_data=False, plot_predictions=False, n_prior_samples=0,
 model=None, kernel=None, n_test=500):

 plt.figure(figsize=(12, 6))
 if plot_observed_data:
 plt.plot(X.numpy(), y.numpy(), 'kx')
 if plot_predictions:
 Xtest = torch.linspace(-0.5, 5.5, n_test) # test inputs
 # compute predictive mean and variance
 with torch.no_grad():
 if type(model) == gp.models.VariationalSparseGP:
 mean, cov = model(Xtest, full_cov=True)
 else:
 mean, cov = model(Xtest, full_cov=True, noiseless=False)
 sd = cov.diag().sqrt() # standard deviation at each input point x
 plt.plot(Xtest.numpy(), mean.numpy(), 'r', lw=2) # plot the mean
 plt.fill_between(Xtest.numpy(), # plot the two-sigma uncertainty about the mean
 (mean - 2.0 * sd).numpy(),
 (mean + 2.0 * sd).numpy(),
 color='C0', alpha=0.3)
 if n_prior_samples > 0: # plot samples from the GP prior
 Xtest = torch.linspace(-0.5, 5.5, n_test) # test inputs
 noise = (model.noise if type(model) != gp.models.VariationalSparseGP
 else model.likelihood.variance)
 cov = kernel.forward(Xtest) + noise.expand(n_test).diag()
 samples = dist.MultivariateNormal(torch.zeros(n_test), covariance_matrix=cov)\
 .sample(sample_shape=(n_prior_samples,))
 plt.plot(Xtest.numpy(), samples.numpy().T, lw=2, alpha=0.4)

 plt.xlim(-0.5, 5.5)

In [None]:
N = 20
X = dist.Uniform(0.0, 5.0).sample(sample_shape=(N,))
y = 0.5 * torch.sin(3*X) + dist.Normal(0.0, 0.2).sample(sample_shape=(N,))

plot(plot_observed_data=True) # let's plot the observed data


In [None]:
kernel = gp.kernels.RBF(input_dim=1, variance=torch.tensor(5.),
 lengthscale=torch.tensor(10.))
gpr = gp.models.GPRegression(X, y, kernel, noise=torch.tensor(1.))

In [None]:
plot(model=gpr, kernel=kernel, n_prior_samples=2)

In [None]:
kernel2 = gp.kernels.RBF(input_dim=1, variance=torch.tensor(10.),
 lengthscale=torch.tensor(.1))
gpr2 = gp.models.GPRegression(X, y, kernel2, noise=torch.tensor(0.1))
plot(model=gpr2, kernel=kernel2, n_prior_samples=2)

In [None]:
optim = Adam({"lr": 0.005})
svi = SVI(gpr.model, gpr.guide, optim, loss=Trace_ELBO())
losses = []
num_steps = 2500
for i in range(num_steps):
 losses.append(svi.step())

In [None]:
plt.plot(losses)

In [None]:
plot(model=gpr, plot_observed_data=True, plot_predictions=True)

In [None]:
gpr.kernel.get_param("variance")

In [None]:
gpr.kernel.get_param("lengthscale")

In [None]:
gpr.get_param("noise").item()

In [None]:
gpr.kernel.set_prior("lengthscale", dist.LogNormal(0.0, 1.0))
gpr.kernel.set_prior("variance", dist.LogNormal(0.0, 1.0))
# we reset the param store so that the previous inference doesn't interfere with this one
pyro.clear_param_store()
optim = Adam({"lr": 0.005})
svi = SVI(gpr.model, gpr.guide, optim, loss=Trace_ELBO())
losses = []
num_steps = 2500
for i in range(num_steps):
 losses.append(svi.step())
plt.plot(losses)

In [None]:
plot(model=gpr, plot_observed_data=True, plot_predictions=True)

In [None]:
for param_name in pyro.get_param_store().get_all_param_names():
 print('{} = {}'.format(param_name, pyro.param(param_name).item()))

In [None]:
N = 1000
X = dist.Uniform(0.0, 5.0).sample(sample_shape=(N,))
y = 0.5 * torch.sin(3*X) + dist.Normal(0.0, 0.2).sample(sample_shape=(N,))
plot(plot_observed_data=True)

In [None]:
Xu = torch.arange(20, dtype=torch.float) / 4.0

# initialize the kernel and model
kernel = gp.kernels.RBF(input_dim=1)
# we increase the jitter for better numerical stability
sgpr = gp.models.SparseGPRegression(X, y, kernel, Xu=Xu, jitter=1.0e-5)

# the way we setup inference is similar to above
pyro.clear_param_store()
optim = Adam({"lr": 0.005})
svi = SVI(sgpr.model, sgpr.guide, optim, loss=Trace_ELBO())
losses = []
num_steps = 2500
for i in range(num_steps):
 losses.append(svi.step())
plt.plot(losses)

In [None]:
# let's look at the inducing points we've learned
print("inducing points:\n{}".format(pyro.param("SGPR$$$Xu").data.numpy()))
# and plot the predictions from the sparse GP
plot(model=sgpr, plot_observed_data=True, plot_predictions=True)

In [None]:
# initialize the inducing inputs
Xu = torch.arange(10, dtype=torch.float) / 2.0

# initialize the kernel, likelihood, and model
kernel = gp.kernels.RBF(input_dim=1)
likelihood = gp.likelihoods.Gaussian()
# turn on "whiten" flag for more stable optimization
vsgp = gp.models.VariationalSparseGP(X, y, kernel, Xu=Xu, likelihood=likelihood, whiten=True)

pyro.clear_param_store()
# instead of defining our own training loop, we will
# use the built-in support provided by the GP module
num_steps = 1500
losses = vsgp.optimize(optimizer=Adam({"lr": 0.01}), num_steps=num_steps)
plt.plot(losses)

In [None]:
plot(model=vsgp, plot_observed_data=True, plot_predictions=True)