#!/usr/bin/env python3# -*- coding: utf-8 -*-## Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#importhashlibimportjsonimportloggingimportwarningsfromabcimportABCMetafromabcimportabstractmethodfromcopyimportdeepcopyfromtypingimportDictfromtypingimportListfromtypingimportMappingfromtypingimportTuplefromtypingimportUnionfromurllib.parseimporturlparsetry:importvineyardexcept(ImportError,TypeError):vineyard=Nonefromgraphscope.frameworkimportdag_utilsfromgraphscope.frameworkimportutilsfromgraphscope.framework.dagimportDAGNodefromgraphscope.framework.errorsimportcheck_argumentfromgraphscope.framework.graph_schemaimportGraphSchemafromgraphscope.framework.graph_utilsimportEdgeLabelfromgraphscope.framework.graph_utilsimportEdgeSubLabelfromgraphscope.framework.graph_utilsimportVertexLabelfromgraphscope.framework.operationimportOperationfromgraphscope.framework.utilsimportapply_docstringfromgraphscope.framework.utilsimportdata_type_to_cppfromgraphscope.protoimportattr_value_pb2fromgraphscope.protoimportgraph_def_pb2fromgraphscope.protoimporttypes_pb2logger=logging.getLogger("graphscope")classGraphInterface(metaclass=ABCMeta):"""Base Class to derive GraphDAGNode and Graph"""def__init__(self):self._session=Noneself._directed=Trueself._generate_eid=Trueself._retain_oid=Trueself._oid_type="int64"self._vid_type="uint64"self._vertex_map=graph_def_pb2.GLOBAL_VERTEX_MAPself._compact_edges=Falseself._use_perfect_hash=Falseself._extend_label_data=0@propertydefsession_id(self):raiseNotImplementedError@abstractmethoddefadd_column(self,results,selector):raiseNotImplementedError@abstractmethoddefadd_vertices(self,vertices,label="_",properties=None,vid_field=0):raiseNotImplementedError@abstractmethoddefadd_edges(self,edges,label="_",properties=None,src_label=None,dst_label=None,src_field=0,dst_field=1,):raiseNotImplementedError@abstractmethoddefconsolidate_columns(self,label:str,columns:Union[List[str],Tuple[str]],result_column:str,):raiseNotImplementedErrordefis_directed(self):returnself._directeddefto_numpy(self,selector,vertex_range=None):raiseNotImplementedErrordefto_dataframe(self,selector,vertex_range=None):raiseNotImplementedErrordefsave_to(self,path,**kwargs):raiseNotImplementedError@classmethoddefload_from(cls,path,sess,**kwargs):raiseNotImplementedError@abstractmethoddefproject(self,vertices,edges):raiseNotImplementedErrordefunload(self):withwarnings.catch_warnings():warnings.simplefilter("always",DeprecationWarning)warnings.warn("The Graph.unload() method has been deprecated, please using the `del` operator instead, i.e., `del graph`",DeprecationWarning,)def_from_nx_graph(self,g):"""Create a gs graph from a nx graph. Args: g (:class:`graphscope.nx.graph`): A nx graph that contains graph data. Raises: RuntimeError: NX graph and gs graph not in the same session. TypeError: Convert a graph view of nx graph to gs graph. Returns: :class:`graphscope.framework.operation.Operation` that will be used to construct a :class:`graphscope.Graph` Examples: .. code:: python >>> import graphscope as gs >>> nx_g = gs.nx.path_graph(10) >>> gs_g = gs.Graph(nx_g) """ifself.session_id!=g.session_id:raiseRuntimeError("networkx graph and graphscope graph not in the same session.")ifhasattr(g,"_graph"):raiseTypeError("graph view can not convert to gs graph")returndag_utils.dynamic_to_arrow(g)def_from_vineyard(self,vineyard_object):"""Load a graph from a already existed vineyard graph. Args: vineyard_object (:class:`vineyard.Object`, :class:`vineyard.ObjectID` or :class:`vineyard.ObjectName`): vineyard object, which represents a graph. Returns: :class:`graphscope.framework.operation.Operation` """ifisinstance(vineyard_object,vineyard.Object):returnself._construct_op_from_vineyard_id(vineyard_object.id)ifisinstance(vineyard_object,vineyard.ObjectID):returnself._construct_op_from_vineyard_id(vineyard_object)ifisinstance(vineyard_object,vineyard.ObjectName):returnself._construct_op_from_vineyard_name(vineyard_object)def_construct_op_from_vineyard_id(self,vineyard_id):assertself._sessionisnotNoneconfig={}config[types_pb2.IS_FROM_VINEYARD_ID]=utils.b_to_attr(True)config[types_pb2.VINEYARD_ID]=utils.i_to_attr(int(vineyard_id))# FIXME(hetao) hardcode oid/vid type for codegen, when loading from vineyard## the metadata should be retrieved from vineyardconfig[types_pb2.OID_TYPE]=utils.s_to_attr("int64_t")config[types_pb2.VID_TYPE]=utils.s_to_attr("uint64_t")returndag_utils.create_graph(self.session_id,graph_def_pb2.ARROW_PROPERTY,attrs=config)def_construct_op_from_vineyard_name(self,vineyard_name):assertself._sessionisnotNoneconfig={}config[types_pb2.IS_FROM_VINEYARD_ID]=utils.b_to_attr(True)config[types_pb2.VINEYARD_NAME]=utils.s_to_attr(str(vineyard_name))# FIXME(hetao) hardcode oid/vid type for codegen, when loading from vineyard## the metadata should be retrieved from vineyardconfig[types_pb2.OID_TYPE]=utils.s_to_attr("int64_t")config[types_pb2.VID_TYPE]=utils.s_to_attr("uint64_t")returndag_utils.create_graph(self.session_id,graph_def_pb2.ARROW_PROPERTY,attrs=config)def_construct_op_of_empty_graph(self):config={}config[types_pb2.ARROW_PROPERTY_DEFINITION]=attr_value_pb2.AttrValue()config[types_pb2.DIRECTED]=utils.b_to_attr(self._directed)config[types_pb2.GENERATE_EID]=utils.b_to_attr(self._generate_eid)config[types_pb2.RETAIN_OID]=utils.b_to_attr(self._retain_oid)config[types_pb2.OID_TYPE]=utils.s_to_attr(self._oid_type)config[types_pb2.VID_TYPE]=utils.s_to_attr(self._vid_type)config[types_pb2.IS_FROM_VINEYARD_ID]=utils.b_to_attr(False)config[types_pb2.IS_FROM_GAR]=utils.b_to_attr(False)config[types_pb2.VERTEX_MAP_TYPE]=utils.i_to_attr(self._vertex_map)config[types_pb2.COMPACT_EDGES]=utils.b_to_attr(self._compact_edges)config[types_pb2.USE_PERFECT_HASH]=utils.b_to_attr(self._use_perfect_hash)config[types_pb2.EXTEND_LABEL_DATA]=utils.i_to_attr(self._extend_label_data)returndag_utils.create_graph(self.session_id,graph_def_pb2.ARROW_PROPERTY,inputs=None,attrs=config)
[docs]classGraphDAGNode(DAGNode,GraphInterface):"""A class represents a graph node in a DAG. In GraphScope, all operations that generate a new graph will return a instance of :class:`GraphDAGNode`, which will be automatically executed by :meth:`Session.run` in `eager` mode. The following example demonstrates its usage: .. code:: python >>> # lazy mode >>> import graphscope as gs >>> sess = gs.session(mode="lazy") >>> g = sess.g() >>> g1 = g.add_vertices("person.csv","person") >>> print(g1) # <graphscope.framework.graph.GraphDAGNode object> >>> g2 = sess.run(g1) >>> print(g2) # <graphscope.framework.graph.Graph object> >>> # eager mode >>> import graphscope as gs >>> sess = gs.session(mode="eager") >>> g = sess.g() >>> g1 = g.add_vertices("person.csv","person") >>> print(g1) # <graphscope.framework.graph.Graph object> >>> del g1 """
[docs]def__init__(self,session,incoming_data=None,oid_type="int64",vid_type="uint64",directed=True,generate_eid=True,retain_oid=True,vertex_map:Union[str,int]="global",compact_edges=False,use_perfect_hash=False,):"""Construct a :class:`GraphDAGNode` object. Args: session (:class:`Session`): A graphscope session instance. incoming_data: Graph can be initialized through various type of sources, which can be one of: - :class:`graphscope.framework.operation.Operation` - :class:`graphscope.nx.Graph` - :class:`graphscope.Graph` - :class:`vineyard.Object`, :class:`vineyard.ObjectId` or :class:`vineyard.ObjectName` oid_type: (str, optional): Type of vertex original id. Defaults to "int64". vid_type: (str, optional): Type of vertex internal id. Defaults to "uint64". directed: (bool, optional): Directed graph or not. Defaults to True. generate_eid: (bool, optional): Generate id for each edge when set True. Defaults to True. retain_oid: (bool, optional): Keep original ID in vertex table when set True. Defaults to True. vertex_map (str, optional): Indicate use global vertex map or local vertex map. Can be "global" or "local". Defaults to global. compact_edges (bool, optional): Compact edges (CSR) using varint and delta encoding. Defaults to False. Note that compact edges helps to half the memory usage of edges in graph data structure, but may cause at most 10%~20% performance degeneration in some algorithms. Defaults to False. use_perfect_hash (bool, optional): Use perfect hash in vertex map to optimize the memory usage. Defaults to False. """super().__init__()self._session=sessionoid_type=utils.normalize_data_type_str(oid_type)ifoid_typenotin("int32_t","int64_t","std::string"):raiseValueError("oid_type can only be int32_t, int64_t or string.")vid_type=utils.normalize_data_type_str(vid_type)ifvid_typenotin("uint32_t","uint64_t"):raiseValueError("vid_type can only be uint32_t or uint64_t.")self._oid_type=oid_typeself._vid_type=vid_typeself._directed=directedself._generate_eid=generate_eidself._retain_oid=retain_oidself._graph_type=graph_def_pb2.ARROW_PROPERTYself._vertex_map=utils.vertex_map_type_to_enum(vertex_map)self._compact_edges=compact_edgesself._use_perfect_hash=use_perfect_hash# for need to extend label in 'eager mode' when add_vertices and add_edges# 0 - not extending label# 1 - extend vertex label# 2 - extend edge labelself._extend_label_data=0# list of pair <parent_op_key, VertexLabel/EdgeLabel>self._unsealed_vertices_and_edges=list()# check for newly added vertices and edges.self._v_labels=list()self._e_labels=list()self._e_relationships=list()self._base_graph=None# add op to dagself._resolve_op(incoming_data)self._session.dag.add_op(self._op)# statically create the unload op, as the op may change, the# unload op should be refreshed as well.ifself._opisNone:self._unload_op=Noneelse:self._unload_op=dag_utils.unload_graph(self)
@propertydefv_labels(self):returnself._v_labels@v_labels.setterdefv_labels(self,value):self._v_labels=value@propertydefe_labels(self):returnself._e_labels@e_labels.setterdefe_labels(self,value):self._e_labels=value@propertydefe_relationships(self):returnself._e_relationships@e_relationships.setterdefe_relationships(self,value):self._e_relationships=value@propertydefgraph_type(self):"""The type of the graph object. Returns: type (`types_pb2.GraphType`): the type of the graph. """returnself._graph_type@propertydefoid_type(self):returnutils.normalize_data_type_str(self._oid_type)@propertydefvid_type(self):returnutils.normalize_data_type_str(self._vid_type)def_project_to_simple(self,v_prop=None,e_prop=None):check_argument(self.graph_type==graph_def_pb2.ARROW_PROPERTY)op=dag_utils.project_to_simple(self,str(v_prop),str(e_prop))# construct dag nodegraph_dag_node=GraphDAGNode(self._session,op,self._oid_type,self._vid_type,self._directed,self._generate_eid,self._retain_oid,self._vertex_map,self._compact_edges,self._use_perfect_hash,)graph_dag_node._base_graph=selfreturngraph_dag_nodedef_resolve_op(self,incoming_data):ifincoming_dataisNone:# create dag node of empty graphself._op=self._construct_op_of_empty_graph()elifisinstance(incoming_data,Operation):self._op=incoming_dataifself._op.type==types_pb2.PROJECT_TO_SIMPLE:self._graph_type=graph_def_pb2.ARROW_PROJECTEDelifisinstance(incoming_data,Graph):self._op=dag_utils.copy_graph(incoming_data)self._graph_type=incoming_data.graph_typeelifisinstance(incoming_data,GraphDAGNode):ifincoming_data.session_id!=self.session_id:raiseRuntimeError(f"{incoming_data} not in the same session.")raiseNotImplementedErrorelifvineyardisnotNoneandisinstance(incoming_data,(vineyard.Object,vineyard.ObjectID,vineyard.ObjectName)):self._op=self._from_vineyard(incoming_data)else:# Don't import the :code:`NXGraph` in top-level statements to improve the# performance of :code:`import graphscope`.fromgraphscopeimportnxifisinstance(incoming_data,nx.classes.graph._GraphBase):self._op=self._from_nx_graph(incoming_data)else:raiseRuntimeError("Not supported incoming data.")# update the unload opself._unload_op=dag_utils.unload_graph(self)defto_numpy(self,selector,vertex_range=None):"""Select some elements of the graph and output to numpy. Args: selector (str): Select a portion of graph as a numpy.ndarray. vertex_range(dict, optional): Slice vertices. Defaults to None. Returns: :class:`graphscope.framework.context.ResultDAGNode`: A result holds the `numpy.ndarray`, evaluated in eager mode. """# avoid circular importfromgraphscope.framework.contextimportResultDAGNodecheck_argument(self.graph_type==graph_def_pb2.ARROW_PROPERTY)vertex_range=utils.transform_vertex_range(vertex_range)op=dag_utils.graph_to_numpy(self,selector,vertex_range)returnResultDAGNode(self,op)defto_dataframe(self,selector,vertex_range=None):"""Select some elements of the graph and output as a pandas.DataFrame Args: selector (dict): Select some portions of graph. vertex_range (dict, optional): Slice vertices. Defaults to None. Returns: :class:`graphscope.framework.context.ResultDAGNode`: A result holds the `pandas.DataFrame`, evaluated in eager mode. """# avoid circular importfromgraphscope.framework.contextimportResultDAGNodecheck_argument(self.graph_type==graph_def_pb2.ARROW_PROPERTY)check_argument(isinstance(selector,Mapping),"selector of to dataframe must be a dict",)selector=json.dumps(selector)vertex_range=utils.transform_vertex_range(vertex_range)op=dag_utils.graph_to_dataframe(self,selector,vertex_range)returnResultDAGNode(self,op)defto_directed(self):op=dag_utils.to_directed(self)graph_dag_node=GraphDAGNode(self._session,op)returngraph_dag_nodedefto_undirected(self):op=dag_utils.to_undirected(self)graph_dag_node=GraphDAGNode(self._session,op)returngraph_dag_node
[docs]defadd_vertices(self,vertices,label="_",properties=None,vid_field:Union[int,str]=0):"""Add vertices to the graph, and return a new graph. Args: vertices (Union[str, Loader]): Vertex data source. label (str, optional): Vertex label name. Defaults to "_". properties (list[str], optional): List of column names loaded as properties. Defaults to None. vid_field (int or str, optional): Column index or property name used as id field. Defaults to 0. Raises: ValueError: If the given value is invalid or conflict with current graph. Returns: :class:`graphscope.framework.graph.GraphDAGNode`: A new graph with vertex added, evaluated in eager mode. """ifself._vertex_map==graph_def_pb2.LOCAL_VERTEX_MAP:raiseValueError("Cannot incrementally add vertices to graphs with local vertex map, ""please use `graphscope.load_from()` instead.")ifself._compact_edges:raiseValueError("Cannot incrementally add vertices to graphs with compacted edges, ""please use `graphscope.load_from()` instead.")ifnotself._v_labelsandself._e_labels:raiseValueError("Cannot manually add vertices after inferred vertices.")# currently not support local_vertex_mapiflabelinself._v_labels:self._extend_label_data=1warnings.warn(f"Label {label} already existed in graph"", origin label data will be extend.")unsealed_vertices_and_edges=deepcopy(self._unsealed_vertices_and_edges)vertex_label=VertexLabel(label=label,loader=vertices,properties=properties,vid_field=vid_field,id_type=self._oid_type,session_id=self._session.session_id,)unsealed_vertices_and_edges.append((self.op.key,vertex_label))v_labels=deepcopy(self._v_labels)ifself._extend_label_data==0:v_labels.append(label)# generate and add a loader op to dagloader_op=dag_utils.create_loader(vertex_label)self._session.dag.add_op(loader_op)# construct add label opop=dag_utils.add_labels_to_graph(self,loader_op)# construct dag nodegraph_dag_node=GraphDAGNode(self._session,op,self._oid_type,self._vid_type,self._directed,self._generate_eid,self._retain_oid,self._vertex_map,self._compact_edges,self._use_perfect_hash,)graph_dag_node._v_labels=v_labelsgraph_dag_node._e_labels=self._e_labelsgraph_dag_node._e_relationships=self._e_relationshipsgraph_dag_node._unsealed_vertices_and_edges=unsealed_vertices_and_edgesgraph_dag_node._base_graph=selfreturngraph_dag_node
[docs]defadd_edges(self,edges,label="_e",properties=None,src_label=None,dst_label=None,src_field:Union[int,str]=0,dst_field:Union[int,str]=1,):"""Add edges to the graph, and return a new graph. Here the src_label and dst_label must be both specified or both unspecified, i. src_label and dst_label both unspecified and current graph has no vertex label. We deduce vertex label from edge table, and set vertex label name to '_'. ii. src_label and dst_label both unspecified and current graph has one vertex label. We set src_label and dst label to this single vertex label. ii. src_label and dst_label both specified and existed in current graph's vertex labels. iii. src_label and dst_label both specified and some are not existed in current graph's vertex labels. We deduce missing vertex labels from edge tables. Args: edges (Union[str, Loader]): Edge data source. label (str, optional): Edge label name. Defaults to "_e". properties (list[str], optional): List of column names loaded as properties. Defaults to None. src_label (str, optional): Source vertex label. Defaults to None. dst_label (str, optional): Destination vertex label. Defaults to None. src_field (int, optional): Column index or name used as src field. Defaults to 0. dst_field (int, optional): Column index or name used as dst field. Defaults to 1. Raises: ValueError: If the given value is invalid or conflict with current graph. Returns: :class:`graphscope.framework.graph.GraphDAGNode`: A new graph with edge added, evaluated in eager mode. """ifself._compact_edges:raiseValueError("Cannot incrementally add edges to graphs with compacted edges, ""please use `graphscope.load_from()` instead.")ifsrc_labelisNoneanddst_labelisNone:check_argument(len(self._v_labels)<=1,"Ambiguous vertex label, please specify the src_label and dst_label.",)iflen(self._v_labels)==1:src_label=dst_label=self._v_labels[0]else:src_label=dst_label="_"ifsrc_labelisNoneordst_labelisNone:raiseValueError("src and dst label must be both specified or either unspecified.")check_argument(src_field!=dst_field,"src and dst field cannot refer to the same field")ifself.evaluated:iflabelinself._e_labels:self._extend_label_data=2unsealed_vertices=list()unsealed_edges=list()v_labels=deepcopy(self._v_labels)e_labels=deepcopy(self._e_labels)relations=deepcopy(self._e_relationships)ifsrc_labelnotinself._v_labels:logger.warning("Deducing vertex labels %s",src_label)v_labels.append(src_label)ifsrc_label!=dst_labelanddst_labelnotinself._v_labels:logger.warning("Deducing vertex labels %s",dst_label)v_labels.append(dst_label)parent=selfifnotself.evaluatedandlabelinself.e_labels:# aggregate op with the same edge labelfork=Falseunsealed_vertices_and_edges=list()forparent_op_key,unsealed_v_or_einself._unsealed_vertices_and_edges:if(isinstance(unsealed_v_or_e,EdgeLabel)andunsealed_v_or_e.label==label):parent=self._backtrack_graph_dag_node_by_op_key(parent_op_key)cur_label=unsealed_v_or_ecur_label.add_sub_label(EdgeSubLabel(edges,properties,src_label,dst_label,src_field,dst_field,id_type=self._oid_type,))fork=Trueelse:unsealed_vertices_and_edges.append((parent_op_key,unsealed_v_or_e))iffork:ifisinstance(unsealed_v_or_e,VertexLabel):unsealed_vertices.append(unsealed_v_or_e)else:unsealed_edges.append(unsealed_v_or_e)unsealed_edges.append(cur_label)unsealed_vertices_and_edges.append((parent.op.key,cur_label))else:unsealed_vertices_and_edges=deepcopy(self._unsealed_vertices_and_edges)e_labels.append(label)relations.append([(src_label,dst_label)])cur_label=EdgeLabel(label,self._oid_type,self._session.session_id)cur_label.add_sub_label(EdgeSubLabel(edges,properties,src_label,dst_label,src_field,dst_field,id_type=self._oid_type,))unsealed_edges.append(cur_label)unsealed_vertices_and_edges.append((parent.op.key,cur_label))# generate and add a loader op to dagloader_op=dag_utils.create_loader(unsealed_vertices+unsealed_edges)self._session.dag.add_op(loader_op)# construct add label opop=dag_utils.add_labels_to_graph(parent,loader_op)# construct dag nodegraph_dag_node=GraphDAGNode(self._session,op,self._oid_type,self._vid_type,self._directed,self._generate_eid,self._retain_oid,self._vertex_map,self._compact_edges,self._use_perfect_hash,)graph_dag_node._v_labels=v_labelsgraph_dag_node._e_labels=e_labelsgraph_dag_node._e_relationships=relationsgraph_dag_node._unsealed_vertices_and_edges=unsealed_vertices_and_edgesgraph_dag_node._base_graph=parentreturngraph_dag_node
[docs]defconsolidate_columns(self,label:str,columns:Union[List[str],Tuple[str]],result_column:str,):"""Consolidate columns of given vertex / edge properties (of same type) into one column. For example, if we have a graph with vertex label "person", and edge labels "knows" and "follows", and we want to consolidate the "weight0", "weight1" properties of the vertex and both edges into a new column "weight", we can do: .. code:: python >>> g = ... >>> g = g.consolidate_columns("person", ["weight0", "weight1"], "weight") >>> g = g.consolidate_columns("knows", ["weight0", "weight1"], "weight") >>> g = g.consolidate_columns("follows", ["weight0", "weight1"], "weight") Args: label: the label of the vertex or edge. columns (dict): the properties of given vertex or edge to be consolidated. result_column: the name of the new column. Returns: :class:`graphscope.framework.graph.GraphDAGNode`: A new graph with column consolidated, evaluated in eager mode. """check_argument(isinstance(columns,(list,tuple)),"columns must be a list or tuple of strings",)op=dag_utils.consolidate_columns(self,label,columns,result_column)graph_dag_node=GraphDAGNode(self._session,op,self._oid_type,self._vid_type,self._directed,self._generate_eid,self._retain_oid,self._vertex_map,self._compact_edges,self._use_perfect_hash,)graph_dag_node._base_graph=selfreturngraph_dag_node
[docs]defadd_column(self,results,selector):"""Add the results as a column to the graph. Modification rules are given by the selector. Args: results: A instance of concrete class derive from (:class:`graphscope.framework.context.BaseContextDAGNode`): A context that created by doing an app query on a graph, and holds the corresponding results. selector (dict): Select results to add as column. Format is similar to selectors in :class:`graphscope.framework.context.Context` Returns: :class:`graphscope.framework.graph.GraphDAGNode`: A new graph with new columns, evaluated in eager mode. """check_argument(isinstance(selector,Mapping),"selector of add column must be a dict")forkey,valueinselector.items():results._check_selector(value)selector=json.dumps(selector)op=dag_utils.add_column(self,results,selector)graph_dag_node=GraphDAGNode(self._session,op,vertex_map=self._vertex_map,compact_edges=self._compact_edges,use_perfect_hash=self._use_perfect_hash,)graph_dag_node._base_graph=selfreturngraph_dag_node
def__del__(self):try:self.session.run(self._unload())exceptException:# pylint: disable=broad-exceptpassdef_unload(self):"""Unload this graph from graphscope engine. Returns: :class:`graphscope.framework.graph.UnloadedGraph`: Evaluated in eager mode. """returnUnloadedGraph(self._session,self._unload_op)
[docs]defproject(self,vertices:Mapping[str,Union[List[str],None]],edges:Mapping[str,Union[List[str],None]],):"""Project a subgraph from the property graph, and return a new graph. A graph produced by project just like a normal property graph, and can be projected further. Args: vertices (dict): key is the vertex label name, the value is a list of str, which represents the name of properties. Specifically, it will select all properties if value is None. Note that, the label of the vertex in all edges you want to project should be included. edges (dict): key is the edge label name, the value is a list of str, which represents the name of properties. Specifically, it will select all properties if value is None. Returns: :class:`graphscope.framework.graph.GraphDAGNode`: A new graph projected from the property graph, evaluated in eager mode. """check_argument(self.graph_type==graph_def_pb2.ARROW_PROPERTY)ifisinstance(vertices,(list,set))orisinstance(edges,(list,set)):raiseValueError("\nThe project vertices or edges cannot be a set or a list, rather, a dict is expected, \n""where the key is the label name and the value is a list of property name. E.g.,\n""\n"" g.project(vertices={'person': ['name', 'age']},\n"" edges={'knows': ['weight']})\n""\n""The property list for vertices and edges can be empty if not needed, e.g.,\n""\n"" g.project(vertices={'person': []}, edges={'knows': []})\n")op=dag_utils.project_arrow_property_graph(self,json.dumps(vertices),json.dumps(edges))# construct dag nodegraph_dag_node=GraphDAGNode(self._session,op,self._oid_type,self._vid_type,self._directed,self._generate_eid,self._retain_oid,self._vertex_map,self._compact_edges,self._use_perfect_hash,)graph_dag_node._base_graph=selfreturngraph_dag_node
[docs]classGraph(GraphInterface):"""A class for representing metadata of a graph in the GraphScope. A :class:`Graph` object holds the metadata of a graph, such as key, schema, and the graph is directed or not. It is worth noticing that the graph is stored by the backend such as Analytical Engine, Vineyard. In other words, the graph object holds nothing but metadata. The following example demonstrates its usage: .. code:: python >>> import graphscope as gs >>> sess = gs.session() >>> graph = sess.g() >>> graph = graph.add_vertices("person.csv", "person") >>> graph = graph.add_vertices("software.csv", "software") >>> graph = graph.add_edges("knows.csv", "knows", src_label="person", dst_label="person") >>> graph = graph.add_edges("created.csv", "created", src_label="person", dst_label="software") >>> print(graph) >>> print(graph.schema) """
[docs]def__init__(self,graph_node,):"""Construct a :class:`Graph` object."""self._graph_node=graph_nodeself._session=self._graph_node.session# copy and set op evaluatedself._graph_node.op=deepcopy(self._graph_node.op)self._graph_node.evaluated=Trueself._graph_node._unload_op=dag_utils.unload_graph(self._graph_node)self._session.dag.add_op(self._graph_node.op)self._key=Noneself._vineyard_id=0self._fragments=Noneself._schema=GraphSchema()self._detached=Falseself._vertex_map=graph_node._vertex_mapself._compact_edges=graph_node._compact_edgesself._use_perfect_hash=graph_node._use_perfect_hashself._interactive_instance_list=[]self._learning_instance_list=[]
defupdate_from_graph_def(self,graph_def):ifgraph_def.graph_type==graph_def_pb2.ARROW_FLATTENED:self._graph_node._graph_type=graph_def_pb2.ARROW_FLATTENEDcheck_argument(self._graph_node.graph_type==graph_def.graph_type,"Graph type doesn't match {} versus {}".format(self._graph_node.graph_type,graph_def.graph_type),)self._key=graph_def.keyself._directed=graph_def.directedself._is_multigraph=graph_def.is_multigraphself._compact_edges=graph_def.compact_edgesself._use_perfect_hash=graph_def.use_perfect_hashvy_info=graph_def_pb2.VineyardInfoPb()graph_def.extension.Unpack(vy_info)self._vineyard_id=vy_info.vineyard_idself._fragments=list(vy_info.fragments)self._oid_type=data_type_to_cpp(vy_info.oid_type)self._vid_type=data_type_to_cpp(vy_info.vid_type)self._generate_eid=vy_info.generate_eidself._retain_oid=vy_info.retain_oidself._schema_path=vy_info.schema_pathself._schema.from_graph_def(graph_def)self._v_labels=self._schema.vertex_labelsself._e_labels=self._schema.edge_labelsself._e_relationships=self._schema.edge_relationships# init saved_signature (must be after init schema)self._saved_signature=self.signaturedef__getattr__(self,name):ifhasattr(self._graph_node,name):returngetattr(self._graph_node,name)raiseAttributeError("{0} not found.".format(name))@propertydefkey(self):"""The key of the corresponding graph in engine."""returnself._key@propertydefschema(self):"""Schema of the graph. Returns: :class:`GraphSchema`: the schema of the graph """returnself._schema@propertydefschema_path(self):"""Path that Coordinator will write interactive schema path to. Returns: str: The path contains the schema. for interactive engine. """returnself._schema_path@propertydefsignature(self):returnhashlib.sha256("{}.{}".format(self._schema.signature(),self._key).encode("utf-8",errors="ignore")).hexdigest()@propertydefop(self):returnself._graph_node.op@propertydefoid_type(self):returnself._graph_node.oid_type@propertydefvid_type(self):returnself._graph_node.vid_type@propertydeftemplate_str(self):# transform str/string to std::stringoid_type=utils.normalize_data_type_str(self._oid_type)vid_type=utils.normalize_data_type_str(self._vid_type)vdata_type=utils.data_type_to_cpp(self._schema.vdata_type)edata_type=utils.data_type_to_cpp(self._schema.edata_type)vertex_map_type=utils.vertex_map_type_to_cpp(self._vertex_map)vertex_map_type=f"{vertex_map_type}<{oid_type},{vid_type}>"compact_type="true"ifself._compact_edgeselse"false"ifself._graph_type==graph_def_pb2.ARROW_PROPERTY:template=f"vineyard::ArrowFragment<{oid_type},{vid_type},{vertex_map_type},{compact_type}>"elifself._graph_type==graph_def_pb2.ARROW_PROJECTED:template=f"gs::ArrowProjectedFragment<{oid_type},{vid_type},{vdata_type},{edata_type},{vertex_map_type},{compact_type}>"# noqa: E501elifself._graph_type==graph_def_pb2.ARROW_FLATTENED:template=f"ArrowFlattenedFragment<{oid_type},{vid_type},{vdata_type},{edata_type},{compact_type}>"elifself._graph_type==graph_def_pb2.DYNAMIC_PROJECTED:template=f"gs::DynamicProjectedFragment<{vdata_type},{edata_type}>"else:raiseValueError(f"Unsupported graph type: {self._graph_type}")returntemplate@propertydefvineyard_id(self):"""Get the vineyard object_id of this graph. Returns: str: return vineyard id of this graph """returnself._vineyard_id@propertydeffragments(self):returnself._fragments@propertydefsession_id(self):"""Get the currrent session_id. Returns: str: Return session id that the graph belongs to. """returnself._session.session_id
[docs]defdetach(self):"""Detaching a graph makes it being left in vineyard even when the varaible for this :class:`Graph` object leaves the lexical scope. The graph can be accessed using the graph's :code:`ObjectID` or its name later. """self._detached=True
[docs]defloaded(self):"""True if current graph has been loaded in the session."""returnself._session.info["status"]=="active"andself._keyisnotNone
def__str__(self):v_str="\n".join([f"VERTEX: {label}"forlabelinself._v_labels])relations=[]foriinrange(len(self._e_labels)):relations.extend([(self._e_labels[i],src,dst)forsrc,dstinself._e_relationships[i]])e_str="\n".join([f"EDGE: {label}\tsrc: {src}\tdst: {dst}"forlabel,src,dstinrelations])returnf"graphscope.Graph\n{graph_def_pb2.GraphTypePb.Name(self._graph_type)}\n{v_str}\n{e_str}"def__repr__(self):returnself.__str__()def_unload(self):"""Unload this graph from graphscope engine."""rlt=Noneifself._session.info["status"]!="active"orself._keyisNone:returnifself._detached:return# close the associated interactive and learning instancesself._close_interactive_instances()self._close_learning_instances()# unload the graphrlt=self._session._wrapper(self._graph_node._unload())self._key=Nonereturnrltdef__del__(self):# cleanly ignore all exceptions, cause session may already closed / destroyed.try:self._session.run(self._unload())exceptException:# pylint: disable=broad-exceptpass@apply_docstring(GraphDAGNode._project_to_simple)def_project_to_simple(self,v_prop=None,e_prop=None):returnself._session._wrapper(self._graph_node._project_to_simple(v_prop,e_prop))
[docs]defto_numpy(self,selector,vertex_range=None):"""Select some elements of the graph and output to numpy. Args: selector (str): Select a portion of graph as a numpy.ndarray. vertex_range(dict, optional): Slice vertices. Defaults to None. Returns: `numpy.ndarray` """self._check_unmodified()returnself._session._wrapper(self._graph_node.to_numpy(selector,vertex_range))
[docs]defto_dataframe(self,selector,vertex_range=None):"""Select some elements of the graph and output as a pandas.DataFrame Args: selector (dict): Select some portions of graph. vertex_range (dict, optional): Slice vertices. Defaults to None. Returns: `pandas.DataFrame` """self._check_unmodified()returnself._session._wrapper(self._graph_node.to_dataframe(selector,vertex_range))
[docs]defto_directed(self):"""Returns a directed representation of the graph. Returns: :class:`Graph`: A directed graph with the same name, same nodes, and with each edge (u, v, data) replaced by two directed edges (u, v, data) and (v, u, data). """ifself._directed:returnselfreturnself._session._wrapper(self._graph_node.to_directed())
[docs]defto_undirected(self):"""Returns an undirected representation of the digraph. Returns: :class:`Graph`: An undirected graph with the same name and nodes and with edge (u, v, data) if either (u, v, data) or (v, u, data) is in the digraph. If both edges exist in digraph, they will both be preserved. You must check and correct for this manually if desired. """ifnotself._directed:returnselfreturnself._session._wrapper(self._graph_node.to_undirected())
defis_directed(self):returnself._directeddefis_multigraph(self):returnself._is_multigraphdef_check_unmodified(self):check_argument(self.signature==self._saved_signature,"Graph has been modified!")@staticmethoddef_load_from_graphar(path,sess,**kwargs):# graphar now only support global vertex map.vertex_map=utils.vertex_map_type_to_enum("global")# oid_type = utils.get_oid_type_from_graph_info(path)config={types_pb2.OID_TYPE:utils.s_to_attr("int64_t"),# graphar use vertex index as oid, so it always be int64_ttypes_pb2.VID_TYPE:utils.s_to_attr("uint64_t"),types_pb2.IS_FROM_VINEYARD_ID:utils.b_to_attr(False),types_pb2.IS_FROM_GAR:utils.b_to_attr(True),types_pb2.VERTEX_MAP_TYPE:utils.i_to_attr(vertex_map),types_pb2.COMPACT_EDGES:utils.b_to_attr(False),types_pb2.GRAPH_INFO_PATH:utils.s_to_attr(path),types_pb2.STORAGE_OPTIONS:utils.s_to_attr(json.dumps(kwargs)),}op=dag_utils.create_graph(sess.session_id,graph_def_pb2.ARROW_PROPERTY,inputs=[],attrs=config)returnsess._wrapper(GraphDAGNode(sess,op))
[docs]@classmethoddefload_from(cls,uri,sess=None,**kwargs):"""Load a ArrowProperty graph from a certain data source. The data source can be vineyard serialized files, graphar serialized files, or other data sources supported by graphscope. Args: uri (str): URI contains the description of the data source or path contains the serialization files, example: "graphar+file:///tmp/graphar/xxx" sess (`graphscope.Session`): The target session that the graph will be construct, if None, use the default session. selector (dict, optional): the selector to select the data to read. graphar_store_in_local (bool, optional): whether store graphar format in local, default is False. Returns: `Graph`: A new graph object. """fromgraphscope.client.sessionimportget_default_sessiondef_check_load_options(load_options):fork,vinload_options.items():ifk=="selector":ifnotisinstance(v,dict):raiseValueError("selector should be a dict, but got {}".format(type(v)))elifk=="graphar_store_in_local":ifnotisinstance(v,bool):raiseValueError("graphar_store_in_local should be a bool, but got {}".format(v))ifsessisNone:sess=get_default_session()uri_str=uriuri=urlparse(uri)ifuri.schemeand"+"inuri.scheme:source=uri.scheme.split("+")[0]ifuri.scheme.split("+")[-1]notin["file","s3","oss","hdfs"]:raiseValueError("Unknown file system %s, currently only support file, s3, oss and hdfs"%uri.scheme.split("+")[-1])path=uri.scheme.split("+")[-1]+"://"+uri.netloc+uri.pathifsource=="graphar":_check_load_options(kwargs)returncls._load_from_graphar(path,sess,**kwargs)else:raiseValueError("Unknown source %s with uri $s:"%source,uri_str)else:# not a uri string, assume it is a path for deserializationop=dag_utils.deserialize_graph(uri_str,sess,**kwargs)returnsess._wrapper(GraphDAGNode(sess,op))
[docs]defsave_to(self,path,format="serialization",**kwargs,):"""Save graph to specified location with specified format. Args: path (str): the directory path to write graph. format (str): the format to write graph, default is "serialization". selector (dict, optional): the selector to select the data to write. graphar_graph_name (str, optional): the name of graph in graphar format. graphar_file_type (str, optional): the file type of graphar format, support "parquet", "orc", "csv", default is "parquet". graphar_vertex_chunk_size (int, optional): the chunk size of vertex in graphar format, default is 2^18. graphar_edge_chunk_size (int, optional): the chunk size of edge in graphar format, default is 2^22. graphar_store_in_local (bool, optional): whether store graphar format in local, default is False. Return (dict): A dict contains the type and uri string of output data. """def_check_write_options(write_options):fork,vinwrite_options.items():ifk=="graphar_graph_name"andnotisinstance(v,str):raiseValueError("graphar_graph_name should be a string, but got {}".format(type(v)))elifk=="graphar_file_type"andvnotin["parquet","orc","csv"]:raiseValueError("graphar_file_type should be one of ['parquet', 'orc', 'csv'], but got {}".format(v))elifk=="graphar_vertex_chunk_size":ifnotisinstance(v,int)orv<=0:raiseValueError("graphar_vertex_chunk_size should be a positive integer, but got {}".format(v))elifk=="graphar_edge_chunk_size":ifnotisinstance(v,int)orv<=0:raiseValueError("graphar_edge_chunk_size should be a positive integer, but got {}".format(v))elifk=="graphar_store_in_local":ifnotisinstance(v,bool):raiseValueError("graphar_store_in_local should be a bool, but got {}".format(v))elifk=="selector":ifnotisinstance(v,dict):raiseValueError("selector should be a dict, but got {}".format(type(v)))ifformat=="graphar":if"graphar_graph_name"notinkwargs:kwargs["graphar_graph_name"]="graph"# default graph name_check_write_options(kwargs)graph_name=kwargs["graphar_graph_name"]maybe_uri=urlparse(path)ifmaybe_uri.schemeandmaybe_uri.schemenotin["file","s3","oss","hdfs",]:raiseValueError("Unknown file system %s, currently only support file, s3, oss and hdfs"%maybe_uri.scheme)ifnotmaybe_uri.scheme:maybe_uri=maybe_uri._replace(scheme="file")op=dag_utils.save_to_graphar(self,path,**kwargs)self._session.dag.add_op(op)self._session._wrapper(op)return{"type":format,"URI":"graphar+"+maybe_uri.geturl()+graph_name+".graph.yaml",}elifformat=="serialization":# serialize graphop=dag_utils.serialize_graph(self,path,**kwargs)self._session.dag.add_op(op)self._session._wrapper(op)return{"type":format,"URI":path}else:raiseValueError("Unknown format: %s"%format)
[docs]@apply_docstring(GraphDAGNode.add_vertices)defadd_vertices(self,vertices,label="_",properties=None,vid_field:Union[int,str]=0)->Union["Graph",GraphDAGNode]:ifnotself.loaded():raiseRuntimeError("The graph is not loaded")returnself._session._wrapper(self._graph_node.add_vertices(vertices,label,properties,vid_field))
[docs]@apply_docstring(GraphDAGNode.add_edges)defadd_edges(self,edges,label="_",properties=None,src_label=None,dst_label=None,src_field:Union[int,str]=0,dst_field:Union[int,str]=1,)->Union["Graph",GraphDAGNode]:ifnotself.loaded():raiseRuntimeError("The graph is not loaded")returnself._session._wrapper(self._graph_node.add_edges(edges,label,properties,src_label,dst_label,src_field,dst_field))
[docs]@apply_docstring(GraphDAGNode.consolidate_columns)defconsolidate_columns(self,label:str,columns:Union[List[str],Tuple[str]],result_column:str,)->Union["Graph",GraphDAGNode]:ifnotself.loaded():raiseRuntimeError("The graph is not loaded")returnself._session._wrapper(self._graph_node.consolidate_columns(label,columns,result_column))
[docs]@apply_docstring(GraphDAGNode.project)defproject(self,vertices:Mapping[str,Union[List[str],None]],edges:Mapping[str,Union[List[str],None]],)->Union["Graph",GraphDAGNode]:ifnotself.loaded():raiseRuntimeError("The graph is not loaded")returnself._session._wrapper(self._graph_node.project(vertices,edges))
def_attach_interactive_instance(self,instance):"""Store the instance when a new interactive instance is started. Args: instance: interactive instance """self._interactive_instance_list.append(instance)def_attach_learning_instance(self,instance):"""Store the instance when a new learning instance is created. Args: instance: learning instance """self._learning_instance_list.append(instance)def_close_interactive_instances(self):forinstanceinself._interactive_instance_list:try:instance.close()exceptException:# pylint: disable=broad-exceptlogger.exception("Failed to close interactive instances")self._interactive_instance_list.clear()def_close_learning_instances(self):forinstanceinself._learning_instance_list:try:instance.close()exceptException:# pylint: disable=broad-exceptlogger.exception("Failed to close interactive instances")self._learning_instance_list.clear()
classUnloadedGraph(DAGNode):"""Unloaded graph node in a DAG."""def__init__(self,session,op):self._session=sessionself._op=op# add op to dagself._session.dag.add_op(self._op)