Source code for quantlaw.utils.networkx

import os

import networkx as nx
import pandas as pd


[docs]def induced_subgraph( G, filter_type, filter_attribute, filter_values, ignore_attrs=False ): """ Create custom induced subgraph. Args: filter_type: 'node' or 'edge' filter_attribute: attribute to filter on filter_values: attribute values to evaluate to `True` """ G = nx.MultiDiGraph(G) if filter_type == "node": nodes = [ n for n in G.nodes() if G.nodes[n].get(filter_attribute) in filter_values ] sG = nx.induced_subgraph(G, nodes) elif filter_type == "edge": sG = nx.MultiDiGraph() sG.add_nodes_from(G.nodes(data=not ignore_attrs)) sG.add_edges_from( [ (e[0], e[1], e[-1]) for e in G.edges(data=True) if e[-1][filter_attribute] in filter_values ] ) else: raise sG.graph["name"] = "_".join( [G.graph["name"], filter_type, filter_attribute, str(*filter_values)] ) return sG
[docs]def hierarchy_graph(G: nx.DiGraph, ignore_attrs=False): """ Remove reference edges from G. Wrapper around induced_subgraph. """ hG = induced_subgraph(G, "edge", "edge_type", ["containment"], ignore_attrs) return hG
[docs]def multi_to_weighted(G: nx.MultiDiGraph): """ Converts a multidigraph into a weighted digraph. """ nG = nx.DiGraph(G) # nG.add_nodes_from(G.nodes) nG.name = G.name + "_weighted_nomulti" edge_weights = {(u, v): 0 for u, v, k in G.edges} for u, v, key in G.edges: edge_weights[(u, v)] += 1 # nG.add_edges_from(edge_weights.keys()) nx.set_edge_attributes(nG, edge_weights, "weight") return nG
[docs]def get_leaves(G: nx.DiGraph): """ Args: G: A tree as directed graph with edges from root to leaves Returns: Set of leaves of the tree G """ H = hierarchy_graph(G, ignore_attrs=True) return set([node for node in H.nodes if H.out_degree(node) == 0])
[docs]def decay_function(key: int): """ Returns a decay function to create a weighted sequence graph. """ return lambda x: (x - 1) ** (-key)
[docs]def sequence_graph( G: nx.MultiDiGraph, seq_decay_func=decay_function(1), seq_ref_ratio=1 ): """ Creates sequence graph for G, consisting of seqitems and their cross-references only, where neighboring seqitems are connected via edges in both directions. Args: seq_decay_func: function to calculate sequence edge weight based on distance between neighboring nodes seq_ref_ratio: ratio between a sequence edge weight when nodes in the sequence are at minimum distance from each other and a reference edge weight """ hG = hierarchy_graph(G, ignore_attrs=True) # make sure we get _all_ seqitems as leaves, not only the ones without outgoing # references leaves = [n for n in hG.nodes() if hG.out_degree(n) == 0] sG = nx.MultiDiGraph(nx.induced_subgraph(G, leaves)) if seq_ref_ratio: nx.set_edge_attributes(sG, 1 / seq_ref_ratio, name="weight") node_headings = dict(sG.nodes(data="heading")) ordered_seqitems = sorted(list(node_headings.keys())) # connect neighboring seqitems sequentially new_edges = get_new_edges(G, ordered_seqitems, seq_decay_func) sG.add_edges_from(new_edges) else: nx.set_edge_attributes(sG, 1, name="weight") sG.graph["name"] = f'{G.graph["name"]}_sequence_graph_seq_ref_ratio_{seq_ref_ratio}' return sG
[docs]def get_new_edges(G, ordered_seqitems, seq_decay_func): """ Convenience function to avoid list comprehension over four lines. """ there = [] back = [] hG = hierarchy_graph(G).to_undirected() for idx, n in enumerate(ordered_seqitems[:-1]): next_item = ordered_seqitems[idx + 1] if ( n.split("_")[0] == next_item.split("_")[0] ): # n and next_item are in the same law distance = nx.shortest_path_length(hG, source=n, target=next_item) weight = seq_decay_func(distance) there.append( ( n, next_item, {"edge_type": "sequence", "weight": weight, "backwards": False}, ) ) back.append( ( next_item, n, {"edge_type": "sequence", "weight": weight, "backwards": True}, ) ) return there + back
[docs]def quotient_graph( G, node_attribute, edge_types=["reference", "cooccurrence"], self_loops=False, root_level=-1, aggregation_attrs=("chars_n", "chars_nowhites", "tokens_n", "tokens_unique"), ): """ Generate the quotient graph with all nodes sharing the same node_attribute condensed into a single node. Simplest use case is aggregation by law_name. """ # node_key:attribute_value map attribute_data = dict(G.nodes(data=node_attribute)) # set cluster -1 if they were not part of the clustering # (guess: those are empty laws) attribute_data = { k: (v if v is not None else -1) for k, v in attribute_data.items() } # remove the root if root_level is given root = None if root_level is not None: roots = [x for x in G.nodes() if G.nodes[x]["level"] == root_level] if roots: root = roots[0] # unique values in that map with root node unique_values = sorted({v for k, v in attribute_data.items() if k != root}) # build a new MultiDiGraph nG = nx.MultiDiGraph() # add nodes new_nodes = {x: [] for x in unique_values} nG.add_nodes_from(unique_values) # sort nodes into buckets for n in attribute_data.keys(): if n != root: mapped_to = attribute_data[n] new_nodes[mapped_to].append(n) if G.nodes[n].get("heading") == mapped_to: for x in G.nodes[n].keys(): nG.nodes[mapped_to][x] = G.nodes[n][x] # add edges for e in G.edges(data=True): if e[-1]["edge_type"] not in edge_types: continue if (True if self_loops else attribute_data[e[0]] != attribute_data[e[1]]) and ( True if root_level is None else G.nodes[e[0]]["level"] != root_level ): # special treatment for root k = nG.add_edge( attribute_data[e[0]], attribute_data[e[1]], edge_type=e[-1]["edge_type"] ) if e[-1]["edge_type"] == "sequence": nG.edges[attribute_data[e[0]], attribute_data[e[1]], k]["weight"] = e[ -1 ]["weight"] nG.graph["name"] = f'{G.graph["name"]}_quotient_graph_{node_attribute}' if aggregation_attrs: aggregate_attr_in_quotient_graph(nG, G, new_nodes, aggregation_attrs) return nG
[docs]def aggregate_attr_in_quotient_graph(nG, G, new_nodes, aggregation_attrs): """ Sums attributes of nodes in an original graph per community and adds the sum to the nodes in a quotient graph. Args: nG: Quotient graph G: Original graph new_nodes: Mapping of nodes in the quotient graph to an iterable of nodes in the original graph that are represented by the node in the quotient graph. aggregation_attrs: attributes to aggregate """ for attr in aggregation_attrs: attr_data = nx.get_node_attributes(G, attr) for community_id, nodes in new_nodes.items(): aggregated_value = sum(attr_data.get(n) for n in nodes) nG.nodes[community_id][attr] = aggregated_value
[docs]def load_graph_from_csv_files( crossreference_folder, file_basename, filter="exclude_subseqitems", filter_by_edge_types=None, ): """ Loads a networkx MultiDiGraph from a nodelist and edgelist formatted as .csv.gz files. The node csv must have a 'key' column that serves as a node key. Other columns are added as node attributes. The edge csv must have a the columns 'u', 'v', 'edge_type'. If filter is node all nodes will be loaded. By default subeqitems will be excluded. If filter is a callable, it is called with a pandas.DataFrame loaded from the csv as the only argument. The callable must return values to filter the DataFrame. Args: crossreference_folder: Folder containing the edgelists file_basename: base filename of the edgelists (will be suffixed with '.nodes.csv.gz' and '.edges.csv.gz') filter: Filters the nodes to load. Options "exclude_subseqitems", None or a function that filters a pandas.DataFrame filter_by_edge_types: Filters the edges to load. None in includes all edges. You can also provide a list of edge_types. E.g. `['containment', 'reference']`. """ nodes_csv_path = os.path.join( crossreference_folder, f"{file_basename}.nodes.csv.gz" ) edges_csv_path = os.path.join( crossreference_folder, f"{file_basename}.edges.csv.gz" ) G = nx.MultiDiGraph(name=str(file_basename)) nodes_df = pd.read_csv(nodes_csv_path) if filter == "exclude_subseqitems": nodes_df = nodes_df[nodes_df.type != "subseqitem"] elif callable(filter): nodes_df = nodes_df[filter(nodes_df)] G.add_nodes_from(list(nodes_df.key)) for column in nodes_df.columns: attrs_dict = { k: v for k, v in zip(nodes_df.key, nodes_df[column]) if not pd.isna(v) } nx.set_node_attributes( G, attrs_dict, column, ) all_nodes = set(nodes_df.key) del nodes_df edges_df = pd.read_csv(edges_csv_path) edges = [ (u, v, {"edge_type": edge_type}) for u, v, edge_type in zip(edges_df.u, edges_df.v, edges_df.edge_type) if ( (filter_by_edge_types is None or edge_type in filter_by_edge_types) and u in all_nodes and v in all_nodes ) ] del edges_df G.add_edges_from(edges) return G