Skip to content

Documentation for quiver.Feature

quiver.feature.Feature

Feature partitions data onto different GPUs' memory and CPU memory and does feature collection with high performance. You will need to set device_cache_size to tell Feature how much data it can cached on GPUs memory. By default, it will partition data by your device_cache_size, if you want to cache hot data, you can pass graph topology csr_topo so that Feature will reorder all data by nodes' degree which we expect to provide higher cache hit rate and will offer better performance with regard to cache random data.

>>> cpu_tensor = torch.load("cpu_tensor.pt")
>>> feature = Feature(0, device_list=[0, 1], device_cache_size='200M')
>>> feature.from_cpu_tensor(cpu_tensor)
>>> choose_idx = torch.randint(0, feature.size(0), 100)
>>> selected_feature = feature[choose_idx]

Parameters:

Name Type Description Default
rank int

device for feature collection kernel to launch

required
device_list [int]

device list for data placement

required
device_cache_size Union[int, str]

cache data size for each device, can be like 0.9M or 3GB

required
cache_policy str

cache_policy for hot data, can be device_replicate or p2p_clique_replicate, choose p2p_clique_replicate when you have NVLinks between GPUs, else choose device_replicate. (default: device_replicate)

required
csr_topo quiver.CSRTopo

CSRTopo of the graph for feature reordering

required

from_cpu_tensor(self, cpu_tensor)

Create quiver.Feature from a pytorh cpu float tensor

Parameters:

Name Type Description Default
cpu_tensor torch.FloatTensor

input cpu tensor

required
Source code in quiver/feature.py
def from_cpu_tensor(self, cpu_tensor: torch.Tensor):
    """Create quiver.Feature from a pytorh cpu float tensor

    Args:
        cpu_tensor (torch.FloatTensor): input cpu tensor
    """
    if self.cache_policy == "device_replicate":
        cache_memory_budget = self.cal_memory_budget_bytes(
            self.device_cache_size)
        shuffle_ratio = 0.0
    else:
        cache_memory_budget = self.cal_memory_budget_bytes(
            self.device_cache_size) * len(self.topo.p2pClique2Device[0])
        shuffle_ratio = self.cal_size(
            cpu_tensor, cache_memory_budget) / cpu_tensor.size(0)

    print(
        f"LOG>>> {min(100, int(100 * cache_memory_budget / cpu_tensor.numel() / 4))}% data cached"
    )
    if self.csr_topo is not None:
        if self.csr_topo.feature_order is None:
            cpu_tensor, self.csr_topo.feature_order = reindex_feature(
                self.csr_topo, cpu_tensor, shuffle_ratio)
        self.feature_order = self.csr_topo.feature_order.to(self.rank)
    cache_part, self.cpu_part = self.partition(cpu_tensor,
                                               cache_memory_budget)
    self.cpu_part = self.cpu_part.clone()
    if cache_part.shape[0] > 0 and self.cache_policy == "device_replicate":
        for device in self.device_list:
            shard_tensor = ShardTensor(self.rank, ShardTensorConfig({}))
            shard_tensor.append(cache_part, device)
            self.device_tensor_list[device] = shard_tensor

    elif cache_part.shape[0] > 0:
        clique0_device_list = self.topo.p2pClique2Device.get(0, [])
        clique1_device_list = self.topo.p2pClique2Device.get(1, [])

        block_size = self.cal_size(
            cpu_tensor,
            cache_memory_budget // len(self.topo.p2pClique2Device[0]))

        if len(clique0_device_list) > 0:
            print(
                f"LOG>>> GPU {clique0_device_list} belong to the same NUMA Domain"
            )
            shard_tensor = ShardTensor(self.rank, ShardTensorConfig({}))
            cur_pos = 0
            for idx, device in enumerate(clique0_device_list):
                if idx == len(clique0_device_list) - 1:
                    shard_tensor.append(cache_part[cur_pos:], device)
                else:

                    shard_tensor.append(
                        cache_part[cur_pos:cur_pos + block_size], device)
                    cur_pos += block_size

            self.clique_tensor_list[0] = shard_tensor

        if len(clique1_device_list) > 0:
            print(
                f"LOG>>> GPU {clique1_device_list} belong to the same NUMA Domain"
            )
            shard_tensor = ShardTensor(self.rank, ShardTensorConfig({}))
            cur_pos = 0
            for idx, device in enumerate(clique1_device_list):
                if idx == len(clique1_device_list) - 1:
                    shard_tensor.append(cache_part[cur_pos:], device)
                else:

                    shard_tensor.append(
                        cache_part[cur_pos:cur_pos + block_size], device)
                    cur_pos += block_size

            self.clique_tensor_list[1] = shard_tensor

    # 构建CPU Tensor
    if self.cpu_part.numel() > 0:
        if self.cache_policy == "device_replicate":
            shard_tensor = self.device_tensor_list.get(
                self.rank, None) or ShardTensor(self.rank,
                                                ShardTensorConfig({}))
            shard_tensor.append(self.cpu_part, -1)
            self.device_tensor_list[self.rank] = shard_tensor
        else:
            clique_id = self.topo.get_clique_id(self.rank)
            shard_tensor = self.clique_tensor_list.get(
                clique_id, None) or ShardTensor(self.rank,
                                                ShardTensorConfig({}))
            shard_tensor.append(self.cpu_part, -1)
            self.clique_tensor_list[clique_id] = shard_tensor

new_from_ipc_handle(rank, ipc_handle) classmethod

Create from ipc handle

Parameters:

Name Type Description Default
rank int

device rank for feature collection kernels to launch

required
ipc_handle tuple

ipc handle create from share_ipc

required

Returns:

Type Description
[quiver.Feature]

created quiver.Feature

Source code in quiver/feature.py
@classmethod
def new_from_ipc_handle(cls, rank, ipc_handle):
    """Create from ipc handle

    Args:
        rank (int): device rank for feature collection kernels to launch
        ipc_handle (tuple): ipc handle create from `share_ipc`

    Returns:
        [quiver.Feature]: created quiver.Feature
    """
    gpu_ipc_handle_dict, cpu_part, device_list, device_cache_size, cache_policy, csr_topo = ipc_handle
    feature = cls(rank, device_list, device_cache_size, cache_policy)
    feature.from_gpu_ipc_handle_dict(gpu_ipc_handle_dict, cpu_part)
    if csr_topo is not None:
        feature.feature_order = csr_topo.feature_order.to(rank)
    feature.csr_topo = csr_topo
    return feature

share_ipc(self)

Get ipc handle for multiprocessing

Returns:

Type Description
tuples

ipc handles for ShardTensor and torch.Tensor and python native objects

Source code in quiver/feature.py
def share_ipc(self):
    """Get ipc handle for multiprocessing

    Returns:
        tuples: ipc handles for ShardTensor and torch.Tensor and python native objects
    """
    gpu_ipc_handle_dict = {}
    if self.cache_policy == "device_replicate":
        for device in self.device_tensor_list:
            gpu_ipc_handle_dict[device] = self.device_tensor_list[
                device].share_ipc()[0]
    else:
        for clique_id in self.clique_tensor_list:
            gpu_ipc_handle_dict[clique_id] = self.clique_tensor_list[
                clique_id].share_ipc()[0]

    return gpu_ipc_handle_dict, self.cpu_part, self.device_list, self.device_cache_size, self.cache_policy, self.csr_topo

size(self, dim)

Get dim size for quiver.Feature

Parameters:

Name Type Description Default
dim int

dimension

required

Returns:

Type Description
int

dimension size for dim

Source code in quiver/feature.py
def size(self, dim: int):
    """ Get dim size for quiver.Feature

    Args:
        dim (int): dimension 

    Returns:
        int: dimension size for dim
    """
    self.lazy_init_from_ipc_handle()
    if self.cache_policy == "device_replicate":
        shard_tensor = self.device_tensor_list[self.rank]
        return shard_tensor.size(dim)
    else:
        clique_id = self.topo.get_clique_id(self.rank)
        shard_tensor = self.clique_tensor_list[clique_id]
        return shard_tensor.size(dim)

quiver.pyg.sage_sampler.GraphSageSampler

Quiver's GraphSageSampler behaves just like Pyg's NeighborSampler but with much higher performance. It can work in UVA mode or GPU mode. You can set mode=GPU if you have enough GPU memory to place graph's topology data which will offer the best sample performance. When your graph is too big for GPU memory, you can set mode=UVA to still use GPU to perform sample but place the data in host memory. UVA mode suffers 30%-40% performance loss compared to GPU mode but is much faster than CPU sampling(normally 16x~20x) and it consumes much less GPU memory compared to GPU mode.

Parameters:

Name Type Description Default
csr_topo quiver.CSRTopo

A quiver.CSRTopo for graph topology

required
sizes [int]

The number of neighbors to sample for each node in each layer. If set to sizes[l] = -1, all neighbors are included in layer l.

required
device int

Device which sample kernel will be launched

required
mode str

Sample mode, choices are [UVA, GPU], default is UVA.

required

lazy_from_ipc_handle(ipc_handle) classmethod

Create from ipc handle

Parameters:

Name Type Description Default
ipc_handle tuple

ipc handle got from calling share_ipc

required

Returns:

Type Description
quiver.pyg.GraphSageSampler

Sampler created from ipc handle

Source code in quiver/pyg/sage_sampler.py
@classmethod
def lazy_from_ipc_handle(cls, ipc_handle):
    """Create from ipc handle

    Args:
        ipc_handle (tuple): ipc handle got from calling `share_ipc`

    Returns:
        quiver.pyg.GraphSageSampler: Sampler created from ipc handle
    """
    csr_topo, sizes, mode = ipc_handle
    return cls(csr_topo, sizes, -1, mode)

sample(self, input_nodes)

Sample k-hop neighbors from input_nodes

Parameters:

Name Type Description Default
input_nodes torch.LongTensor

seed nodes ids to sample from

required

Returns:

Type Description
Tuple

Return results are the same with Pyg's sampler

Source code in quiver/pyg/sage_sampler.py
def sample(self, input_nodes):
    """Sample k-hop neighbors from input_nodes

    Args:
        input_nodes (torch.LongTensor): seed nodes ids to sample from

    Returns:
        Tuple: Return results are the same with Pyg's sampler
    """
    self.lazy_init_quiver()
    nodes = input_nodes.to(self.device)
    adjs = []

    batch_size = len(nodes)
    for size in self.sizes:
        out, cnt = self.sample_layer(nodes, size)
        frontier, row_idx, col_idx = self.reindex(nodes, out, cnt)
        row_idx, col_idx = col_idx, row_idx
        edge_index = torch.stack([row_idx, col_idx], dim=0)

        adj_size = torch.LongTensor([
            frontier.size(0),
            nodes.size(0),
        ])
        e_id = torch.tensor([])
        adjs.append(Adj(edge_index, e_id, adj_size))
        nodes = frontier

    return nodes, batch_size, adjs[::-1]

share_ipc(self)

Create ipc handle for multiprocessing

Returns:

Type Description
tuple

ipc handle tuple

Source code in quiver/pyg/sage_sampler.py
def share_ipc(self):
    """Create ipc handle for multiprocessing

    Returns:
        tuple: ipc handle tuple
    """
    return self.csr_topo, self.sizes, self.mode

quiver.utils.CSRTopo

Graph topology in CSR format.

>>> csr_topo = CSRTopo(edge_index=edge_index)
>>> csr_topo = CSRTopo(indptr=indptr, indices=indices)

Parameters:

Name Type Description Default
edge_index [torch.LongTensor], optinal

edge_index tensor for graph topo

required
indptr torch.LongTensor, optinal

indptr for CSR format graph topo

required
indices torch.LongTensor, optinal

indices for CSR format graph topo

required

degree property readonly

Get degree of each node in this graph

Returns:

Type Description
[torch.LongTensor]

degree tensor for each node

edge_count property readonly

Edge count of the graph

Returns:

Type Description
int

edge count

feature_order property writable

Get feature order for this graph

Returns:

Type Description
torch.LongTensor

feature order

indices property readonly

Get indices

Returns:

Type Description
torch.LongTensor

indices

indptr property readonly

Get indptr

Returns:

Type Description
torch.LongTensor

indptr

node_count property readonly

Node count of the graph

Returns:

Type Description
int

node count

quiver.utils.init_p2p(device_list)

Try to enable p2p acess between devices in device_list

Parameters:

Name Type Description Default
device_list List[int]

device list

required
Source code in quiver/utils.py
def init_p2p(device_list: List[int]):
    """Try to enable p2p acess between devices in device_list

    Args:
        device_list (List[int]): device list
    """
    torch_qv.init_p2p(device_list)

quiver.utils.p2pCliqueTopo

P2P access topology for devices. Normally we use this class to detect the connection topology of GPUs on the machine.

>>> p2p_clique_topo = p2pCliqueTopo([0,1])
>>> print(p2p_clique_topo.info())

Parameters:

Name Type Description Default
device_list [int]

device list for detecting p2p access topology

required

p2p_clique property readonly

get all p2p_cliques constructed from devices in device_list

Returns:

Type Description
Dict

{clique_id:[devices in this clique]}

get_clique_id(self, device_id)

Get clique id for device with device_id

Parameters:

Name Type Description Default
device_id int

device id of the device

required

Returns:

Type Description
int

clique_id of the device

Source code in quiver/utils.py
def get_clique_id(self, device_id: int):
    """Get clique id for device with device_id 

    Args:
        device_id (int): device id of the device

    Returns:
        int: clique_id of the device
    """
    return self.Device2p2pClique[device_id]

info(self)

Get string description for p2p access topology, you can call info() to check the topology of your GPUs

Returns:

Type Description
str

p2p access topology for devices in device list

Source code in quiver/utils.py
def info(self):
    """Get string description for p2p access topology, you can call `info()` to check the topology of your GPUs 

    Returns:
        str: p2p access topology for devices in device list
    """
    str = ""
    for clique_idx in self.p2pClique2Device:
        str += f"Devices {self.p2pClique2Device[clique_idx]} support p2p access with each other\n"
    return str