前言
使用场景为:依赖NebulaGraph3.2.0图数据库,对一些数据节点做关联拓线查询,比如输入IP, 可查询展示该IP归属的地理位置、关联的域名、并继续往下根据域名查询解析的URL 、注册信息等,并以为图形的形式进行展示, 技术栈语言为Python。
依赖安装及使用
- 根据图数据库版本, 安装对应的包版本:
pip install nebula3-python==3.1.0
- nebula文档地址: https://docs.nebula-graph.com.cn/3.2.0/3.ngql-guide/7.general-query-statements/2.match/
- git
nebula3-python
地址: https://github.com/vesoft-inc/nebula-python - git python示例项目地址:https://github.com/wey-gu/nebula-corp-rel-search#data-from-backend-side
常用查询
1. python创建nebula连接
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
# define a config
config = Config()
config.max_connection_pool_size = 10
# init connection pool
connection_pool = ConnectionPool()
# if the given servers are ok, return true, else return false
ok = connection_pool.init([('127.0.0.1', 9669)], config)
# option 1 control the connection release yourself
# get session from the pool
session = connection_pool.get_session('root', 'nebula')
# select space
session.execute('USE nba')
# show tags
result = session.execute('SHOW TAGS')
print(result)
# release session
session.release()
# option 2 with session_context, session will be released automatically
with connection_pool.session_context('root', 'nebula') as session:
session.execute('USE nba')
result = session.execute('SHOW TAGS')
print(result)
# close the pool
connection_pool.close()
2. 常用nGQL语句及说明
2.1 统计查询
通过以下三步可获取图空间的点、边数量统计信息:
SUBMIT JOB STATS;
SHOW JOB <job_id>;
SHOW STATS;
2.2 常用数据查询语法
最常用match语法查询,更加灵活
match pattern详细语法参考: https://docs.nebula-graph.com.cn/3.2.0/3.ngql-guide/1.nGQL-overview/3.graph-patterns/
-
MATCH (v) RETURN v LIMIT 10
, 任意查询10个点 -
MATCH ()-[e]->() RETURN e limit 10
, 任意查询10条边 -
MATCH (a)-[e]->(b) WHERE id(a)=="xxx" RETURN a, e, b
, 从id=xxx的a点出发,查询一级出边 -
MATCH (a)-[e]-(b) WHERE id(a)=="xxx" RETURN a, e, b
, 取消方向,从id=xxx的a点出发,查询一级入表和出边 -
MATCH (a)-[e*0..2]->(b) WHERE id(a)=="xxx" RETURN a, e, b
, 从id=xxx的a点出发,查询0-2级的出边 -
GO 0 TO 2 STEPS FROM "xxx" OVER <follow> YIELD properties($$)
, 使用GO遍历查询,与上一句作用一致,需要<follow>指定边类型 - ...
更多操作文档
类代码示例
代码包含python flask nebula客户端初始化、关联查询、结果解析
from nebula3.data import DataObject
from nebula3.gclient.net import ConnectionPool
from nebula3.Config import Config
import pandas as pd
from typing import Dict, Union
from nebula3.data.ResultSet import ResultSet
from nebula3.mclient import MetaCache
from nebula3.sclient.GraphStorageClient import GraphStorageClient
from collections import defaultdict
class NebulaClient:
"""
nebula 查询客户端,定制化为tl图谱查询
"""
def __init__(self, graph_servers=None, meta_servers=None, **nebula_config):
self.graph_servers = graph_servers
self.meta_servers = meta_servers
self.username = None
self.password = None
self.space = None
self.config = Config()
self.graph_pool = ConnectionPool()
self.graph_storage_client = None
self._init_config(**nebula_config)
def _init_config(self, **nebula_config):
self.username = nebula_config.pop("username", "")
self.password = nebula_config.pop("password", "")
self.space = nebula_config.pop("space", "")
for key, value in nebula_config.items():
setattr(self.config, key, value)
if self.graph_servers:
self.graph_pool.init(self.graph_servers, self.config)
if self.meta_servers:
self.graph_storage_client = GraphStorageClient(MetaCache(self.meta_servers))
def init_app(self, app):
nebula_conf = app.config.get("nebula") or {}
nebula_graphd_conf = nebula_conf.pop("graphd", {})
nebula_metad_conf = nebula_conf.pop("metad", {})
graph_hosts = nebula_graphd_conf.pop("host", [])
meta_hosts = nebula_metad_conf.pop("host", [])
self.graph_servers = [(item.split(":")[0], item.split(":")[1]) for item in graph_hosts]
self.meta_servers = [(item.split(":")[0], item.split(":")[1]) for item in meta_hosts]
self._init_config(**nebula_conf)
def ngql_query(self, gql, space=None):
if not space:
space = self.space
with self.graph_pool.session_context(self.username, self.password) as session:
session.execute(f'USE {space}')
result = session.execute(gql)
return result
def match_id_relation_edge(self, vid, space=None, variable_length_edges: Union[None, tuple, list] = None) -> ResultSet:
"""
space: 命名空间
vid:起始查询vid
variable_length_edges: 指定路径长度范围, 用两个元素的数组或者元祖表示最小->最大长度, 例如[1,2]
return: 边和点的查询集ResultSet
"""
if not space:
space = self.space
if not variable_length_edges:
# 默认只查询一个层级
variable_length_edges = [0, 1]
# 根据起始vid,向下关联查询
gql = f'MATCH (source)-[e*{variable_length_edges[0]}..{variable_length_edges[1]}]->(target) WHERE ( id(source) == "{vid}") RETURN source as source,e,target as target LIMIT 100'
print(f"gql: {gql}")
e_result = self.ngql_query(gql, space=space)
return e_result
def match_id_relation_edge_result_to_df(self, result: ResultSet) -> Union[pd.DataFrame, None]:
"""
build list for each column, and transform to dataframe
"""
if result.is_succeeded():
source_v = result.column_values("source")
edge_v = result.column_values("e")
target_v = result.column_values("target")
d: Dict[str, list] = {
"source": [self._parse_node(source) for source in source_v],
"edge": [self._parse_edge(edge) for edge in edge_v],
"target_v": [self._parse_node(target) for target in target_v]
}
return pd.DataFrame.from_dict(d)
return None
def match_id_relation_edge_result_to_struct(self, result: ResultSet):
if result.is_succeeded() and not result.is_empty():
source_v = result.column_values("source")
edge_v = result.column_values("e")
target_v = result.column_values("target")
source_data = self._parse_node(source_v[0])
target_data_list = [self._parse_node(target) for target in target_v]
relationship = defaultdict(list)
for target in target_data_list:
target_type = target.get("type")
# 若目标点和原点是同一个对象, 不做关联
if target.get("id") == source_data.get("id"):
continue
relationship[target_type].append(target)
source_data.update({
"relation": relationship
})
return source_data
def _parse_node(self, node):
node = node.as_node()
tag = node.tags()[0]
node_parsed = {
"id": node.get_id().as_string(),
"type": tag,
"isAlarm": False,
"info": {k: self._parse_wrapper_value(v) for k, v in node.properties(tag).items()}
}
return node_parsed
def _parse_edge(self, edge):
edge = edge.as_list()[0].as_relationship()
edge_parsed = {
"edge_name": edge.edge_name(),
"info": {k: self._parse_wrapper_value(v) for k, v in edge.properties().items()},
"start_vertex_id": edge.start_vertex_id().as_string(),
"end_vertex_id": edge.end_vertex_id().as_string(),
}
return edge_parsed
@staticmethod
def _parse_wrapper_value(value: DataObject.ValueWrapper):
_value = value.get_value().value
if isinstance(_value, bytes):
return _value.decode(encoding="utf-8")
return _value
def scan_vertex(self, tag_name, space_name=None, *args, **kwargs):
if not space_name:
space_name = self.space
resp = self.graph_storage_client.scan_vertex(
space_name=space_name,
tag_name=tag_name,
*args, **kwargs)
all_res = []
while resp.has_next():
result = resp.next()
for vertex_data in result:
all_res.append(vertex_data)
print(vertex_data)
return all_res
def close(self):
if hasattr(self.graph_pool, "close"):
self.graph_pool.close()
if __name__ == '__main__':
graph_servers = [("localhost", 9669), ("localhost", 9669), ("localhost", 9669)]
meta_servers = [("localhost", 9559), ("localhost", 9559), ("localhost", 9559)]
nebula_tool = NebulaClient(graph_servers=graph_servers, meta_servers=meta_servers, username="root", password="nebula", space="test", max_connection_pool_size=10)
# result = nebula_tool.ngql_query(space="tl_vast", gql='match (v) return v limit 10')
# query_id_relations = 'match (a)-[e]-(b) where id(a) == "002b500b73952c997db130214ef03b26" return e;'
# result = nebula_tool.ngql_query(space="tl_vast", gql='MATCH p = (source_v)-[e*1..1]->(target_v) WHERE ( id(source_v) == "002b500b73952c997db130214ef03b26") RETURN p LIMIT 100')
result = nebula_tool.match_id_relation_edge(vid="002b500b73952c997db130214ef03b26")
df_result = nebula_tool.match_id_relation_edge_result_to_df(result)
output_result = nebula_tool.match_id_relation_edge_result_to_struct(result)
# print(nebula_tool.scan_vertex())
print(result)
print(df_result)
print(output_result)