Documentation for quiver.Feature
quiver.feature.Feature
Feature partitions data onto different GPUs' memory and CPU memory and does feature collection with high performance.
You will need to set device_cache_size
to tell Feature how much data it can cached on GPUs memory. By default, it will partition data by your device_cache_size
, if you want to cache hot data, you can pass
graph topology csr_topo
so that Feature will reorder all data by nodes' degree which we expect to provide higher cache hit rate and will offer better performance with regard to cache random data.
>>> cpu_tensor = torch.load("cpu_tensor.pt")
>>> feature = Feature(0, device_list=[0, 1], device_cache_size='200M')
>>> feature.from_cpu_tensor(cpu_tensor)
>>> choose_idx = torch.randint(0, feature.size(0), 100)
>>> selected_feature = feature[choose_idx]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rank |
int |
device for feature collection kernel to launch |
required |
device_list |
[int] |
device list for data placement |
required |
device_cache_size |
Union[int, str] |
cache data size for each device, can be like |
required |
cache_policy |
str |
cache_policy for hot data, can be |
required |
csr_topo |
quiver.CSRTopo |
CSRTopo of the graph for feature reordering |
required |
from_cpu_tensor(self, cpu_tensor)
Create quiver.Feature from a pytorh cpu float tensor
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cpu_tensor |
torch.FloatTensor |
input cpu tensor |
required |
Source code in quiver/feature.py
def from_cpu_tensor(self, cpu_tensor: torch.Tensor):
"""Create quiver.Feature from a pytorh cpu float tensor
Args:
cpu_tensor (torch.FloatTensor): input cpu tensor
"""
if self.cache_policy == "device_replicate":
cache_memory_budget = self.cal_memory_budget_bytes(
self.device_cache_size)
shuffle_ratio = 0.0
else:
cache_memory_budget = self.cal_memory_budget_bytes(
self.device_cache_size) * len(self.topo.p2pClique2Device[0])
shuffle_ratio = self.cal_size(
cpu_tensor, cache_memory_budget) / cpu_tensor.size(0)
print(
f"LOG>>> {min(100, int(100 * cache_memory_budget / cpu_tensor.numel() / 4))}% data cached"
)
if self.csr_topo is not None:
if self.csr_topo.feature_order is None:
cpu_tensor, self.csr_topo.feature_order = reindex_feature(
self.csr_topo, cpu_tensor, shuffle_ratio)
self.feature_order = self.csr_topo.feature_order.to(self.rank)
cache_part, self.cpu_part = self.partition(cpu_tensor,
cache_memory_budget)
self.cpu_part = self.cpu_part.clone()
if cache_part.shape[0] > 0 and self.cache_policy == "device_replicate":
for device in self.device_list:
shard_tensor = ShardTensor(self.rank, ShardTensorConfig({}))
shard_tensor.append(cache_part, device)
self.device_tensor_list[device] = shard_tensor
elif cache_part.shape[0] > 0:
clique0_device_list = self.topo.p2pClique2Device.get(0, [])
clique1_device_list = self.topo.p2pClique2Device.get(1, [])
block_size = self.cal_size(
cpu_tensor,
cache_memory_budget // len(self.topo.p2pClique2Device[0]))
if len(clique0_device_list) > 0:
print(
f"LOG>>> GPU {clique0_device_list} belong to the same NUMA Domain"
)
shard_tensor = ShardTensor(self.rank, ShardTensorConfig({}))
cur_pos = 0
for idx, device in enumerate(clique0_device_list):
if idx == len(clique0_device_list) - 1:
shard_tensor.append(cache_part[cur_pos:], device)
else:
shard_tensor.append(
cache_part[cur_pos:cur_pos + block_size], device)
cur_pos += block_size
self.clique_tensor_list[0] = shard_tensor
if len(clique1_device_list) > 0:
print(
f"LOG>>> GPU {clique1_device_list} belong to the same NUMA Domain"
)
shard_tensor = ShardTensor(self.rank, ShardTensorConfig({}))
cur_pos = 0
for idx, device in enumerate(clique1_device_list):
if idx == len(clique1_device_list) - 1:
shard_tensor.append(cache_part[cur_pos:], device)
else:
shard_tensor.append(
cache_part[cur_pos:cur_pos + block_size], device)
cur_pos += block_size
self.clique_tensor_list[1] = shard_tensor
# 构建CPU Tensor
if self.cpu_part.numel() > 0:
if self.cache_policy == "device_replicate":
shard_tensor = self.device_tensor_list.get(
self.rank, None) or ShardTensor(self.rank,
ShardTensorConfig({}))
shard_tensor.append(self.cpu_part, -1)
self.device_tensor_list[self.rank] = shard_tensor
else:
clique_id = self.topo.get_clique_id(self.rank)
shard_tensor = self.clique_tensor_list.get(
clique_id, None) or ShardTensor(self.rank,
ShardTensorConfig({}))
shard_tensor.append(self.cpu_part, -1)
self.clique_tensor_list[clique_id] = shard_tensor
new_from_ipc_handle(rank, ipc_handle)
classmethod
Create from ipc handle
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rank |
int |
device rank for feature collection kernels to launch |
required |
ipc_handle |
tuple |
ipc handle create from |
required |
Returns:
Type | Description |
---|---|
[quiver.Feature] |
created quiver.Feature |
Source code in quiver/feature.py
@classmethod
def new_from_ipc_handle(cls, rank, ipc_handle):
"""Create from ipc handle
Args:
rank (int): device rank for feature collection kernels to launch
ipc_handle (tuple): ipc handle create from `share_ipc`
Returns:
[quiver.Feature]: created quiver.Feature
"""
gpu_ipc_handle_dict, cpu_part, device_list, device_cache_size, cache_policy, csr_topo = ipc_handle
feature = cls(rank, device_list, device_cache_size, cache_policy)
feature.from_gpu_ipc_handle_dict(gpu_ipc_handle_dict, cpu_part)
if csr_topo is not None:
feature.feature_order = csr_topo.feature_order.to(rank)
feature.csr_topo = csr_topo
return feature
share_ipc(self)
Get ipc handle for multiprocessing
Returns:
Type | Description |
---|---|
tuples |
ipc handles for ShardTensor and torch.Tensor and python native objects |
Source code in quiver/feature.py
def share_ipc(self):
"""Get ipc handle for multiprocessing
Returns:
tuples: ipc handles for ShardTensor and torch.Tensor and python native objects
"""
gpu_ipc_handle_dict = {}
if self.cache_policy == "device_replicate":
for device in self.device_tensor_list:
gpu_ipc_handle_dict[device] = self.device_tensor_list[
device].share_ipc()[0]
else:
for clique_id in self.clique_tensor_list:
gpu_ipc_handle_dict[clique_id] = self.clique_tensor_list[
clique_id].share_ipc()[0]
return gpu_ipc_handle_dict, self.cpu_part, self.device_list, self.device_cache_size, self.cache_policy, self.csr_topo
size(self, dim)
Get dim size for quiver.Feature
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dim |
int |
dimension |
required |
Returns:
Type | Description |
---|---|
int |
dimension size for dim |
Source code in quiver/feature.py
def size(self, dim: int):
""" Get dim size for quiver.Feature
Args:
dim (int): dimension
Returns:
int: dimension size for dim
"""
self.lazy_init_from_ipc_handle()
if self.cache_policy == "device_replicate":
shard_tensor = self.device_tensor_list[self.rank]
return shard_tensor.size(dim)
else:
clique_id = self.topo.get_clique_id(self.rank)
shard_tensor = self.clique_tensor_list[clique_id]
return shard_tensor.size(dim)
quiver.pyg.sage_sampler.GraphSageSampler
Quiver's GraphSageSampler behaves just like Pyg's NeighborSampler
but with much higher performance.
It can work in UVA
mode or GPU
mode. You can set mode=GPU
if you have enough GPU memory to place graph's topology data which will offer the best sample performance.
When your graph is too big for GPU memory, you can set mode=UVA
to still use GPU to perform sample but place the data in host memory. UVA
mode suffers 30%-40% performance loss compared to GPU
mode
but is much faster than CPU sampling(normally 16x~20x) and it consumes much less GPU memory compared to GPU
mode.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
csr_topo |
quiver.CSRTopo |
A quiver.CSRTopo for graph topology |
required |
sizes |
[int] |
The number of neighbors to sample for each node in each
layer. If set to |
required |
device |
int |
Device which sample kernel will be launched |
required |
mode |
str |
Sample mode, choices are [ |
required |
lazy_from_ipc_handle(ipc_handle)
classmethod
Create from ipc handle
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ipc_handle |
tuple |
ipc handle got from calling |
required |
Returns:
Type | Description |
---|---|
quiver.pyg.GraphSageSampler |
Sampler created from ipc handle |
Source code in quiver/pyg/sage_sampler.py
@classmethod
def lazy_from_ipc_handle(cls, ipc_handle):
"""Create from ipc handle
Args:
ipc_handle (tuple): ipc handle got from calling `share_ipc`
Returns:
quiver.pyg.GraphSageSampler: Sampler created from ipc handle
"""
csr_topo, sizes, mode = ipc_handle
return cls(csr_topo, sizes, -1, mode)
sample(self, input_nodes)
Sample k-hop neighbors from input_nodes
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_nodes |
torch.LongTensor |
seed nodes ids to sample from |
required |
Returns:
Type | Description |
---|---|
Tuple |
Return results are the same with Pyg's sampler |
Source code in quiver/pyg/sage_sampler.py
def sample(self, input_nodes):
"""Sample k-hop neighbors from input_nodes
Args:
input_nodes (torch.LongTensor): seed nodes ids to sample from
Returns:
Tuple: Return results are the same with Pyg's sampler
"""
self.lazy_init_quiver()
nodes = input_nodes.to(self.device)
adjs = []
batch_size = len(nodes)
for size in self.sizes:
out, cnt = self.sample_layer(nodes, size)
frontier, row_idx, col_idx = self.reindex(nodes, out, cnt)
row_idx, col_idx = col_idx, row_idx
edge_index = torch.stack([row_idx, col_idx], dim=0)
adj_size = torch.LongTensor([
frontier.size(0),
nodes.size(0),
])
e_id = torch.tensor([])
adjs.append(Adj(edge_index, e_id, adj_size))
nodes = frontier
return nodes, batch_size, adjs[::-1]
share_ipc(self)
Create ipc handle for multiprocessing
Returns:
Type | Description |
---|---|
tuple |
ipc handle tuple |
Source code in quiver/pyg/sage_sampler.py
def share_ipc(self):
"""Create ipc handle for multiprocessing
Returns:
tuple: ipc handle tuple
"""
return self.csr_topo, self.sizes, self.mode
quiver.utils.CSRTopo
Graph topology in CSR format.
>>> csr_topo = CSRTopo(edge_index=edge_index)
>>> csr_topo = CSRTopo(indptr=indptr, indices=indices)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
edge_index |
[torch.LongTensor], optinal |
edge_index tensor for graph topo |
required |
indptr |
torch.LongTensor, optinal |
indptr for CSR format graph topo |
required |
indices |
torch.LongTensor, optinal |
indices for CSR format graph topo |
required |
degree
property
readonly
Get degree of each node in this graph
Returns:
Type | Description |
---|---|
[torch.LongTensor] |
degree tensor for each node |
edge_count
property
readonly
Edge count of the graph
Returns:
Type | Description |
---|---|
int |
edge count |
feature_order
property
writable
Get feature order for this graph
Returns:
Type | Description |
---|---|
torch.LongTensor |
feature order |
indices
property
readonly
Get indices
Returns:
Type | Description |
---|---|
torch.LongTensor |
indices |
indptr
property
readonly
Get indptr
Returns:
Type | Description |
---|---|
torch.LongTensor |
indptr |
node_count
property
readonly
Node count of the graph
Returns:
Type | Description |
---|---|
int |
node count |
quiver.utils.init_p2p(device_list)
Try to enable p2p acess between devices in device_list
Parameters:
Name | Type | Description | Default |
---|---|---|---|
device_list |
List[int] |
device list |
required |
Source code in quiver/utils.py
def init_p2p(device_list: List[int]):
"""Try to enable p2p acess between devices in device_list
Args:
device_list (List[int]): device list
"""
torch_qv.init_p2p(device_list)
quiver.utils.p2pCliqueTopo
P2P access topology for devices. Normally we use this class to detect the connection topology of GPUs on the machine.
>>> p2p_clique_topo = p2pCliqueTopo([0,1])
>>> print(p2p_clique_topo.info())
Parameters:
Name | Type | Description | Default |
---|---|---|---|
device_list |
[int] |
device list for detecting p2p access topology |
required |
p2p_clique
property
readonly
get all p2p_cliques constructed from devices in device_list
Returns:
Type | Description |
---|---|
Dict |
{clique_id:[devices in this clique]} |
get_clique_id(self, device_id)
Get clique id for device with device_id
Parameters:
Name | Type | Description | Default |
---|---|---|---|
device_id |
int |
device id of the device |
required |
Returns:
Type | Description |
---|---|
int |
clique_id of the device |
Source code in quiver/utils.py
def get_clique_id(self, device_id: int):
"""Get clique id for device with device_id
Args:
device_id (int): device id of the device
Returns:
int: clique_id of the device
"""
return self.Device2p2pClique[device_id]
info(self)
Get string description for p2p access topology, you can call info()
to check the topology of your GPUs
Returns:
Type | Description |
---|---|
str |
p2p access topology for devices in device list |
Source code in quiver/utils.py
def info(self):
"""Get string description for p2p access topology, you can call `info()` to check the topology of your GPUs
Returns:
str: p2p access topology for devices in device list
"""
str = ""
for clique_idx in self.p2pClique2Device:
str += f"Devices {self.p2pClique2Device[clique_idx]} support p2p access with each other\n"
return str