import os
import ssl
import sys
import torch
import errno
import urllib
import pickle
import zipfile
import numpy as np
import os.path as osp
from tqdm import tqdm
from typing import Optional
from texttable import Texttable
from sklearn.model_selection import train_test_split
[docs]def open_pkl_file(file_path):
r"""
Open pickle file.
Parameters
----------
file_path: str
File path for loading pickle files.
"""
with open(file_path, 'rb') as file:
file_content = pickle.load(file)
return file_content
[docs]def save_pkl_file(file_path, contents):
r"""
Save pickle file.
Parameters
----------
file_path: str
File path for saving pickle files.
contents: list
Contents for saving.
"""
with open(file_path, 'wb') as file:
pickle.dump(contents, file)
print("having saved pkl...")
[docs]def open_txt_file(file_path):
r"""
Open txt file.
Parameters
----------
file_path: str
File path for loading txt files.
"""
with open(file_path, 'r') as file:
contents = [line.rstrip("\n") for line in file.readlines()]
return contents
[docs]def save_txt_file(file_path, contents):
r"""
Save txt file.
Parameters
----------
file_path: str
File path for saving txt files.
contents: list
Contents for saving.
"""
with open(file_path, 'w') as file:
for paragraph in contents:
file.write(paragraph + "\n")
print("having saved txt...")
def dcg_at_k(r, k):
r = np.asfarray(r)[:k]
if r.size:
return r[0] + np.sum(r[1:] / np.log2(np.arange(2, r.size + 1)))
return 0.
[docs]def ndcg_at_k(r, k):
r"""
Compute the Normalized Discounted Cumulative Gain (NDCG) at rank k.
Parameters
----------
r: list
A list of relevance scores representing the ranking of items.
k: int
The rank at which to compute NDCG.
Returns
-------
float
The Normalized Discounted Cumulative Gain (NDCG) value.
"""
dcg_max = dcg_at_k(sorted(r, reverse=True), k)
if not dcg_max:
return 0.
return dcg_at_k(r, k) / dcg_max
[docs]def mean_reciprocal_rank(rs):
r"""
Compute the Mean Reciprocal Rank (MRR) for a list of relevance scores.
Parameters
----------
rs: list of arrays
A list of relevance score arrays where each array represents the indices of relevant items.
Returns
-------
list
A list of MRR values for each query.
"""
rs = (np.asarray(r).nonzero()[0] for r in rs)
return [1. / (r[0] + 1) if r.size else 0. for r in rs]
[docs]def args_print(args):
r"""
Print argments.
Parameters
----------
args: object
args
"""
_dict = vars(args)
t = Texttable()
t.add_row(["Parameter", "Value"])
for k in _dict:
t.add_row([k, _dict[k]])
print(t.draw())
def makedirs(path: str):
r"""Recursive directory creation function."""
try:
os.makedirs(osp.expanduser(osp.normpath(path)))
except OSError as e:
if e.errno != errno.EEXIST and osp.isdir(path):
raise
[docs]def download_url(url: str, folder: str, log: bool = True,
filename: Optional[str] = None):
r"""Downloads the content of an URL to a specific folder.
Parameters
----------
url: str
The url.
folder: str
The folder.
log: bool, optional
If :obj:`False`, will not print anything to the
console. (default: :obj:`True`)
filename: str, optional
The name of the file.
"""
if filename is None:
filename = url.rpartition('/')[2]
filename = filename if filename[0] == '?' else filename.split('?')[0]
if os.environ.get('GGL_GITHUB_PROXY') == 'TRUE' and ('raw.githubusercontent.com' in url or 'github.com' in url):
url = 'https://ghproxy.com/' + url
path = osp.join(folder, filename)
if osp.exists(path): # pragma: no cover
if log:
print(f'Using existing file {filename}', file=sys.stderr)
return path
if log:
print(f'Downloading {url}', file=sys.stderr)
makedirs(folder)
context = ssl._create_unverified_context()
response = urllib.request.urlopen(url, context=context)
file_size = response.getheader('Content-Length', '0')
# print(f"downloading {filename} ...")
file_size = int(file_size)
if file_size == 0:
print(f"Remote file size not found.")
with open(path, 'wb') as f:
# workaround for https://bugs.python.org/issue42853
# add download progress bar
with tqdm(total=file_size, unit='B', unit_divisor=1024, unit_scale=True, desc=f'{filename}') as pbar:
chunk_size = 10 * 1024 * 1024
while True:
chunk = response.read(chunk_size)
if not chunk:
break
f.write(chunk)
pbar.update(chunk_size)
return path
[docs]def download_google_url(id: str, folder: str,
filename: str, log: bool = True):
r"""Downloads the content of a Google Drive ID to a specific folder."""
url = f'https://drive.usercontent.google.com/download?id={id}&confirm=t'
return download_url(url, folder, log, filename)
def parse_npz(f):
r"""Parse a npz file."""
# see in gammagl
pass
[docs]def read_npz(path):
r"""Read a npz file."""
with np.load(path) as f:
return parse_npz(f)
# parse_npz: see in gammagl
def maybe_log(path, log=True):
r"""Prints the path if log is True"""
if log:
print(f'Extracting {path}', file=sys.stderr)
[docs]def get_train_val_test_split(graph, train_ratio, val_ratio):
"""
Split the dataset into train, validation, and test sets.
Parameters
----------
graph :
The graph to split.
train_ratio : float
The proportion of the dataset to include in the train split.
val_ratio : float
The proportion of the dataset to include in the validation split.
Returns
-------
:class:`tuple` of :class:`tensor`
"""
random_state = np.random.RandomState(0)
num_samples = graph.num_nodes
all_indices = np.arange(num_samples)
# split into train and (val + test)
train_indices, val_test_indices = train_test_split(
all_indices, train_size=train_ratio, random_state=random_state
)
# calculate the ratio of validation and test splits in the remaining data
test_ratio = 1.0 - train_ratio - val_ratio
val_size_ratio = val_ratio / (val_ratio + test_ratio)
# split val + test into validation and test sets
val_indices, test_indices = train_test_split(
val_test_indices, train_size=val_size_ratio, random_state=random_state
)
return generate_masks(num_samples, train_indices, val_indices, test_indices)
[docs]def generate_masks(num_nodes, train_indices, val_indices, test_indices):
np_train_mask = np.zeros(num_nodes, dtype=bool)
np_train_mask[train_indices] = 1
np_val_mask = np.zeros(num_nodes, dtype=bool)
np_val_mask[val_indices] = 1
np_test_mask = np.zeros(num_nodes, dtype=bool)
np_test_mask[test_indices] = 1
train_mask = torch.tensor(np_train_mask, dtype=torch.bool)
val_mask = torch.tensor(np_val_mask, dtype=torch.bool)
test_mask = torch.tensor(np_test_mask, dtype=torch.bool)
return train_mask, val_mask, test_mask