Source code for pylops_gpu.LinearOperator

import torch
import numpy as np

from pytorch_complex_tensor import ComplexTensor
from pylops import LinearOperator as pLinearOperator
from pylops_gpu.optimization.cg import cg


[docs]class LinearOperator(pLinearOperator): """Common interface for performing matrix-vector products. This class is an overload of the :py:class:`pylops.LinearOperator` class. It adds functionalities for operators on GPUs; specifically, it allows users specifying when to move model and data from the host to the device and viceversa. Compared to its equivalent PyLops class :class:`pylops.LinearOperator`, it requires input model and data to be :obj:`torch.Tensor` objects. .. note:: End users of PyLops should not use this class directly but simply use operators that are already implemented. This class is meant for developers and it has to be used as the parent class of any new operator developed within PyLops-gpu. Find more details regarding implementation of new operators at :ref:`addingoperator`. Parameters ---------- shape : :obj:`tuple` Operator shape dtype : :obj:`torch.dtype`, optional Type of elements in input array. Op : :obj:`pylops.LinearOperator` Operator to wrap in ``LinearOperator`` (if ``None``, self must implement ``_matvec_`` and ``_rmatvec_``) explicit : :obj:`bool` Operator contains a matrix that can be solved explicitly (``True``) or not (``False``) device : :obj:`str`, optional Device to be used togpu : :obj:`tuple`, optional Move model and data from cpu to gpu prior to applying ``matvec`` and ``rmatvec``, respectively (only when ``device='gpu'``) tocpu : :obj:`tuple`, optional Move data and model from gpu to cpu after applying ``matvec`` and ``rmatvec``, respectively (only when ``device='gpu'``) """ def __init__(self, shape, dtype, Op=None, explicit=False, device='cpu', togpu=(False, False), tocpu=(False, False)): super().__init__(Op=Op, explicit=explicit) self.shape = shape self.dtype = dtype if Op is None: self.Op = None self.device = device self.togpu = togpu self.tocpu = tocpu
[docs] def matvec(self, x): r"""Matrix-vector multiplication. Performs the operation ``y = A * x`` where A is an :math:`N \times M` linear operator and ``x`` is a 1-d array. Parameters ---------- x : :obj:`torch.Tensor` An array with shape (M,) Returns ------- y : :obj:`torch.Tensor` An array with shape (N,) """ # convert x to torch.Tensor if not isinstance(x, torch.Tensor): _tonumpy = True x = torch.from_numpy(x) else: _tonumpy = False # matvec, possibly moving x to gpu and y back to cpu if self.device != 'cpu' and self.togpu[0]: x = x.to(self.device) if self.Op is None: y = self._matvec(x) else: y = self.Op._matvec(x) if self.device != 'cpu' and self.tocpu[0]: y = y.to('cpu') # convert y to numpy when input was numpy if _tonumpy: y = y.numpy() return y
[docs] def rmatvec(self, x): """Adjoint matrix-vector multiplication. Performs the operation ``y = A^H * x`` where A is an :math:`N \times M` linear operator and ``x`` is a 1-d array. Parameters ---------- x : :obj:`torch.Tensor` An array with shape (N,) Returns ------- y : :obj:`torch.Tensor` An array with shape (M,) """ # convert x to torch.Tensor if not isinstance(x, torch.Tensor): _tonumpy = True x = torch.from_numpy(x) else: _tonumpy = False # rmatvec, possibly moving x to gpu and y back to cpu if self.device != 'cpu' and self.togpu[1]: x = x.to(self.device) if self.Op is None: y = self._rmatvec(x) else: y = self.Op._rmatvec(x) if self.device != 'cpu' and self.tocpu[1]: y = y.to('cpu') # convert y to numpy when input was numpy if _tonumpy: y = y.numpy() return y
[docs] def matmat(self, X, kfirst=False): """Matrix-matrix multiplication. Performs the operation ``Y = A * X`` where A is an :math:`N \times M` linear operator and ``X`` is a 2-d array of size :math:`K \times M` (``kfirst=True``) or :math:`M \times K` (``kfirst=False``). Parameters ---------- x : :obj:`torch.Tensor` An array with shape (M, K) or (K, M) kfirst : :obj:`bool`, optional Dimension ``K`` along which the matrix multiplication is performed is in the first dimension (``True``) or in the second dimension (``False``) Returns ------- y : :obj:`torch.Tensor` An array with shape (N, K) or (K, N) """ # convert x to torch.Tensor if not isinstance(X, torch.Tensor): _tonumpy = True X = torch.from_numpy(X) else: _tonumpy = False # matmat, possibly moving x to gpu and y back to cpu if self.device != 'cpu' and self.togpu[0]: X = X.to(self.device) if self.Op is None: Y = self._matmat(X, kfirst=kfirst) else: Y = self.Op._matmat(X, kfirst=kfirst) if self.device != 'cpu' and self.tocpu[0]: Y = Y.to('cpu') # convert y to numpy when input was numpy if _tonumpy: Y = Y.numpy() return Y
[docs] def rmatmat(self, X, kfirst=False): """Adjoint matrix-matrix multiplication. Performs the operation ``Y = A^H * X`` where A is an :math:`N \times M` linear operator and ``X`` is a 2-d array of size :math:`K \times N` (``kfirst=True``) or :math:`N \times K` (``kfirst=False``). Parameters ---------- x : :obj:`torch.Tensor` An array with shape (N, K) or (K, N) kfirst : :obj:`bool`, optional Dimension ``K`` along which the matrix multiplication is performed is in the first dimension (``True``) or in the second dimension (``False``) Returns ------- y : :obj:`torch.Tensor` An array with shape (M, K) or (K, M) """ # convert x to torch.Tensor if not isinstance(X, torch.Tensor): _tonumpy = True X = torch.from_numpy(X) else: _tonumpy = False # matmat, possibly moving x to gpu and y back to cpu if self.device != 'cpu' and self.togpu[0]: X = X.to(self.device) if self.Op is None: Y = self._rmatmat(X, kfirst=kfirst) else: Y = self.Op._rmatmat(X, kfirst=kfirst) if self.device != 'cpu' and self.tocpu[0]: Y = Y.to('cpu') # convert y to numpy when input was numpy if _tonumpy: Y = Y.numpy() return Y
def __call__(self, x): return self * x def _matmat(self, X, kfirst=False): """Matrix-matrix multiplication handler. """ if kfirst: Y = torch.stack([self._matvec(col.view(-1)) for col in X], dim=0) else: Y = torch.stack([self._matvec(col.view(-1)) for col in X.T], dim=0).T return Y def _rmatmat(self, X, kfirst=False): """Adjoint Matrix-matrix multiplication handler. """ if kfirst: Y = torch.stack([self._rmatvec(col.view(-1)) for col in X], dim=0) else: Y = torch.stack([self._rmatvec(col.view(-1)) for col in X.T], dim=0).T return Y def __mul__(self, x): return self.dot(x)
[docs] def dot(self, x): """Matrix-vector multiplication. Parameters ---------- x : :obj:`torch.Tensor` or :obj:`pytorch_complex_tensor.ComplexTensor` 1-d or 2-d array, representing a vector or matrix. Returns ------- Ax : :obj:`torch.Tensor` or :obj:`pytorch_complex_tensor.ComplexTensor` 1-d or 2-d array (depending on the shape of x) that represents the result of applying this linear operator on x. """ if isinstance(x, LinearOperator): return _ProductLinearOperator(self, x) elif np.isscalar(x): return _ScaledLinearOperator(self, x) else: if not isinstance(x, torch.Tensor): ndim = torch.from_numpy(x).ndimension() else: ndim = x.ndimension() if isinstance(x, ComplexTensor): ndim -= 1 if ndim == 1 or ndim == 2 and x.shape[1] == 1: return self.matvec(x) elif ndim == 2: return self.matmat(x) else: raise ValueError('expected 1-d or 2-d array or matrix, got %r' % x)
def __matmul__(self, other): if np.isscalar(other): raise ValueError("Scalar operands are not allowed, " "use '*' instead") return self.__mul__(other) def __rmatmul__(self, other): if np.isscalar(other): raise ValueError("Scalar operands are not allowed, " "use '*' instead") return self.__rmul__(other) def __rmul__(self, x): if np.isscalar(x): return _ScaledLinearOperator(self, x) else: return NotImplemented def __pow__(self, p): if np.isscalar(p): return _PowerLinearOperator(self, p) else: return NotImplemented def __add__(self, x): if isinstance(x, LinearOperator): return _SumLinearOperator(self, x) else: return NotImplemented def __neg__(self): return _ScaledLinearOperator(self, -1) def __sub__(self, x): return self.__add__(-x) def __repr__(self): M, N = self.shape if self.dtype is None: dt = 'unspecified dtype' else: dt = 'dtype=' + str(self.dtype) return '<%dx%d %s with %s>' % (M, N, self.__class__.__name__, dt)
[docs] def adjoint(self): """Hermitian adjoint. Returns the Hermitian adjoint. Can be abbreviated self.H instead of self.adjoint(). """ return self._adjoint()
H = property(adjoint) def _adjoint(self): """Default implementation of _adjoint; defers to rmatvec.""" shape = (self.shape[1], self.shape[0]) return _CustomLinearOperator(shape, matvec=self.rmatvec, rmatvec=self.matvec, dtype=self.dtype, explicit=self.explicit, device=self.device, tocpu=self.tocpu, togpu=self.togpu)
[docs] def div(self, y, niter=100, tol=1e-4): r"""Solve the linear problem :math:`\mathbf{y}=\mathbf{A}\mathbf{x}`. Overloading of operator ``/`` to improve expressivity of `Pylops_gpu` when solving inverse problems. Parameters ---------- y : :obj:`torch.Tensor` Data niter : :obj:`int`, optional Number of iterations (to be used only when ``explicit=False``) tol : :obj:`int` Residual norm tolerance Returns ------- xest : :obj:`np.ndarray` Estimated model """ xest = self.__truediv__(y, niter=niter, tol=tol) return xest
def __truediv__(self, y, niter=None, tol=1e-4): xest = cg(self, y, niter=self.shape[1] if niter is None else niter, tol=tol)[0] return xest
class Identity(LinearOperator): r"""Identity operator. Simply move model to data in forward model and viceversa in adjoint mode if :math:`M = N`. If :math:`M > N` removes last :math:`M - N` elements from model in forward and pads with :math:`0` in adjoint. If :math:`N > M` removes last :math:`N - M` elements from data in adjoint and pads with :math:`0` in forward. Parameters ---------- N : :obj:`int` Number of samples in data (and model, if ``M`` is not provided). M : :obj:`int`, optional Number of samples in model. inplace : :obj:`bool`, optional Work inplace (``True``) or make a new copy (``False``). By default, data is a reference to the model (in forward) and model is a reference to the data (in adjoint). device : :obj:`str`, optional Device to be used togpu : :obj:`tuple`, optional Move model and data from cpu to gpu prior to applying ``matvec`` and ``rmatvec``, respectively (only when ``device='gpu'``) tocpu : :obj:`tuple`, optional Move data and model from gpu to cpu after applying ``matvec`` and ``rmatvec``, respectively (only when ``device='gpu'``) dtype : :obj:`torch.dtype`, optional Type of elements in input array. Attributes ---------- shape : :obj:`tuple` Operator shape explicit : :obj:`bool` Operator contains a matrix that can be solved explicitly (``True``) or not (``False``) Notes ----- Refer to :class:`pylops.basicoperators.Identity` for implementation details. """ def __init__(self, N, M=None, inplace=True, device='cpu', togpu=(False, False), tocpu=(False, False), dtype=torch.float32): M = N if M is None else M self.inplace = inplace self.shape = (N, M) self.device = device self.togpu = togpu self.tocpu = tocpu self.dtype = dtype self.explicit = False self.Op = None def _matvec(self, x): if not self.inplace: x = x.clone() if self.shape[0] == self.shape[1]: y = x elif self.shape[0] < self.shape[1]: y = x[:self.shape[0]] else: y = torch.zeros(self.shape[0], dtype=self.dtype) y[:self.shape[1]] = x return y def _rmatvec(self, x): if not self.inplace: x = x.clone() if self.shape[0] == self.shape[1]: y = x elif self.shape[0] < self.shape[1]: y = torch.zeros(self.shape[1], dtype=self.dtype) y[:self.shape[0]] = x else: y = x[:self.shape[1]] return y class _CustomLinearOperator(LinearOperator): """Linear operator defined in terms of user-specified operations.""" def __init__(self, shape, matvec, rmatvec=None, matmat=None, dtype=None, explicit=None, device='cpu', togpu=(False, False), tocpu=(False, False)): super(_CustomLinearOperator, self).__init__(shape=shape, dtype=dtype, Op=None, explicit=explicit, device=device, togpu=togpu, tocpu=tocpu) self.args = () self.__matvec_impl = matvec self.__rmatvec_impl = rmatvec self.__matmat_impl = matmat self._init_dtype() def _matmat(self, X): if self.__matmat_impl is not None: return self.__matmat_impl(X) else: return super(_CustomLinearOperator, self)._matmat(X) def _matvec(self, x): return self.__matvec_impl(x) def _rmatvec(self, x): func = self.__rmatvec_impl if func is None: raise NotImplementedError("rmatvec is not defined") return self.__rmatvec_impl(x) def _adjoint(self): return _CustomLinearOperator(shape=(self.shape[1], self.shape[0]), matvec=self.__rmatvec_impl, rmatvec=self.__matvec_impl, dtype=self.dtype) class _SumLinearOperator(LinearOperator): def __init__(self, A, B): if not isinstance(A, pLinearOperator) or \ not isinstance(B, pLinearOperator): raise ValueError('both operands have to be a LinearOperator') if A.shape != B.shape: raise ValueError('cannot add %r and %r: shape mismatch' % (A, B)) self.args = (A, B) super(_SumLinearOperator, self).__init__(shape=A.shape, dtype=A.dtype, Op=None, explicit=A.explicit, device=A.device, togpu=A.togpu, tocpu=A.tocpu) def _matvec(self, x): return self.args[0].matvec(x) + self.args[1].matvec(x) def _rmatvec(self, x): return self.args[0].rmatvec(x) + self.args[1].rmatvec(x) def _matmat(self, x): return self.args[0].matmat(x) + self.args[1].matmat(x) def _adjoint(self): A, B = self.args return A.H + B.H class _ProductLinearOperator(LinearOperator): def __init__(self, A, B): if not isinstance(A, pLinearOperator) or \ not isinstance(B, pLinearOperator): raise ValueError('both operands have to be a LinearOperator') if A.shape[1] != B.shape[0]: raise ValueError('cannot multiply %r and %r: shape mismatch' % (A, B)) super(_ProductLinearOperator, self).__init__(shape=(A.shape[0], B.shape[1]), dtype=A.dtype, Op=None, explicit=A.explicit, device=A.device, togpu=A.togpu, tocpu=A.tocpu) self.args = (A, B) def _matvec(self, x): return self.args[0].matvec(self.args[1].matvec(x)) def _rmatvec(self, x): return self.args[1].rmatvec(self.args[0].rmatvec(x)) def _matmat(self, x): return self.args[0].matmat(self.args[1].matmat(x)) def _adjoint(self): A, B = self.args return B.H * A.H class _ScaledLinearOperator(LinearOperator): def __init__(self, A, alpha): if not isinstance(A, pLinearOperator): raise ValueError('LinearOperator expected as A') if not np.isscalar(alpha): raise ValueError('scalar expected as alpha') super(_ScaledLinearOperator, self).__init__(shape=A.shape, dtype=A.dtype, Op=None, explicit=A.explicit, device=A.device, togpu=A.togpu, tocpu=A.tocpu) self.args = (A, alpha) def _matvec(self, x): return self.args[1] * self.args[0].matvec(x) def _rmatvec(self, x): return np.conj(self.args[1]) * self.args[0].rmatvec(x) def _matmat(self, x): return self.args[1] * self.args[0].matmat(x) def _adjoint(self): A, alpha = self.args return A.H * np.conj(alpha) class _PowerLinearOperator(LinearOperator): def __init__(self, A, p): if not isinstance(A, pLinearOperator): raise ValueError('LinearOperator expected as A') if A.shape[0] != A.shape[1]: raise ValueError('square LinearOperator expected, got %r' % A) if not np.issubdtype(type(p), int) or p < 0: raise ValueError('non-negative integer expected as p') super(_PowerLinearOperator, self).__init__(shape=A.shape, dtype=A.dtype, Op=None, explicit=A.explicit, device=A.device, togpu=A.togpu, tocpu=A.tocpu) self.args = (A, p) def _power(self, fun, x): res = x.clone() if isinstance(x, ComplexTensor): res = ComplexTensor(res) for i in range(self.args[1]): res = fun(res) return res def _matvec(self, x): return self._power(self.args[0].matvec, x) def _rmatvec(self, x): return self._power(self.args[0].rmatvec, x) def _matmat(self, x): return self._power(self.args[0].matmat, x) def _adjoint(self): A, p = self.args return A.H ** p