migrate
The migrate
module provides an efficient way to transfer graph data from various relational databases
into Memgraph. This module allows you to retrieve data from various source systems,
transforming tabular data into graph structures.
With Cypher, you can shape the migrated data dynamically, making it easy to create nodes, establish relationships, and enrich your graph. Below are examples showing how to retrieve, filter, and convert relational data into a graph format.
Trait | Value |
---|---|
Module type | util |
Implementation | Python |
Parallelism | sequential |
Procedures
arrow_flight()
With the arrow_flight()
procedure, users can access data sources which support the Arrow Flight RPC protocol for transfer
of large data records to achieve high performance. Underlying implementation is using the pyarrow
Python library to stream rows to
Memgraph. Dremio is a confirmed data source that works with the arrow_flight()
procedure. Other sources may also be compatible, but Dremio is based on previous experience.
Input:
query: str
➡ Query used to query the data source.config: mgp.Map
➡ Connection parameters (as inpyarrow.flight.connect
). Useful parameters for connecting arehost
,port
,username
andpassword
.config_path
➡ Path to a JSON file containing configuration parameters.
Output:
row: mgp.Map
➡ The result table as a stream of rows.
Retrieve and inspect data
CALL migrate.arrow_flight('SELECT * FROM users', {username: 'memgraph',
password: 'password',
host: 'localhost',
port: '12345'} )
YIELD row
RETURN row
LIMIT 5000;
Filter specific data
CALL migrate.arrow_flight('SELECT * FROM users', {username: 'memgraph',
password: 'password',
host: 'localhost',
port: '12345'} )
YIELD row
WHERE row.age >= 30
RETURN row;
Create nodes from migrated data
CALL migrate.arrow_flight('SELECT id, name, age FROM users', {username: 'memgraph',
password: 'password',
host: 'localhost',
port: '12345'} )
YIELD row
CREATE (u:User {id: row.id, name: row.name, age: row.age});
Create relationships between users
CALL migrate.arrow_flight('SELECT user1_id, user2_id FROM friendships', {username: 'memgraph',
password: 'password',
host: 'localhost',
port: '12345'} )
YIELD row
MATCH (u1:User {id: row.user1_id}), (u2:User {id: row.user2_id})
CREATE (u1)-[:FRIENDS_WITH]->(u2);
duckdb()
With the migrate.duckdb()
procedure, users can connect to the ** DuckDB** database and query various data sources.
List of data sources that are supported by DuckDB can be found on their official documentation page.
The underlying implementation streams results from DuckDB to Memgraph using the duckdb
Python Library. DuckDB is started with the in-memory mode, without any
persistence and is used just to proxy to the underlying data sources.
Input:
query: str
➡ Table name or an SQL query.setup_queries: mgp.Nullable[List[str]]
➡ List of queries that will be executed prior to the query provided as the initial argument. Used for setting up the connection to additional data sources.
Output:
row: mgp.Map
➡ The result table as a stream of rows.
Usage:
Retrieve and inspect data
CALL migrate.duckdb("SELECT * FROM 'test.parquet';")
YIELD row
RETURN row
LIMIT 5000;
Filter specific data
CALL migrate.duckdb("SELECT * FROM 'test.parquet';")
YIELD row
WHERE row.age >= 30
RETURN row;
Create nodes from migrated data
CALL migrate.duckdb("SELECT * FROM 'test.parquet';")
YIELD row
CREATE (u:User {id: row.id, name: row.name, age: row.age});
Create relationships between users
CALL migrate.duckdb("SELECT * FROM 'test.parquet';")
YIELD row
MATCH (u1:User {id: row.user1_id}), (u2:User {id: row.user2_id})
CREATE (u1)-[:FRIENDS_WITH]->(u2);
Setup connection to query additional data sources
CALL migrate.duckdb("SELECT * FROM 's3://your_bucket/your_file.parquet';", ["CREATE SECRET secret1 (TYPE s3, KEY_ID 'key', SECRET 'secret', REGION 'region');"])
YIELD row
MATCH (u1:User {id: row.user1_id}), (u2:User {id: row.user2_id})
CREATE (u1)-[:FRIENDS_WITH]->(u2);
memgraph()
With the migrate.memgraph()
procedure, you can access another Memgraph instance and migrate your data to a new Memgraph instance.
The resulting nodes and edges are converted into a stream of rows which can include labels, properties, and primitives.
Streaming of raw node and relationship objects is not supported and users are advised to migrate all the necessary identifiers in order to recreate the same graph in Memgraph.
Input:
label_or_rel_or_query: str
➡ Label name (written in format(:Label)
), relationship name (written in format[:rel_type]
) or a plain cypher query.config: mgp.Map
➡ Connection parameters (as ingqlalchemy.Memgraph
). Notable parameters arehost[String]
, andport[Integer]
config_path
➡ Path to a JSON file containing configuration parameters.params: mgp.Nullable[mgp.Any] (default=None)
➡ Query parameters (if applicable).
Output:
row: mgp.Map
➡ The result table as a stream of rows.- when retrieving nodes using the
(:Label)
syntax, row will have the following keys:labels
, andproperties
- when retrieving relationships using the
[:REL_TYPE]
syntax, row will have the following keys:from_labels
,to_labels
,from_properties
,to_properties
, andedge_properties
- when retrieving results using a plain Cypher query, row will have keys identical to the returned column names from the Cypher query
- when retrieving nodes using the
Usage:
Retrieve nodes of certain label and create them in a new Memgraph instance
CALL migrate.memgraph('(:Person)', {host: 'localhost', port: 7687})
YIELD row
WITH row.labels AS labels, row.properties as props
CREATE (n:labels) SET n += row.props
Retrieve relationships of certain type and create them in a new Memgraph instance
CALL migrate.memgraph('[:KNOWS]', {host: 'localhost', port: 7687})
YIELD row
WITH row.from_labels AS from_labels,
row.to_labels AS to_labels,
row.from_properties AS from_properties,
row.to_properties AS to_properties,
row.edge_properties AS edge_properties
MATCH (p1:Person {id: row.from_properties.id})
MATCH (p2:Person {id: row.to_properties.id})
CREATE (p1)-[r:KNOWS]->(p2)
SET r += edge_properties;
Retrieve information from Memgraph using an arbitrary Cypher query
CALL migrate.memgraph('MATCH (n) RETURN count(n) as cnt', {host: 'localhost', port: 7687})
YIELD row
RETURN row.cnt as cnt;
mysql()
With the migrate.mysql()
procedure, you can access MySQL and migrate your data to Memgraph.
The result table is converted into a stream, and the returned rows can be used to create graph structures.
Input:
table_or_sql: str
➡ Table name or an SQL query.config: mgp.Map
➡ Connection parameters (as inmysql.connector.connect
).config_path
➡ Path to a JSON file containing configuration parameters.params: mgp.Nullable[mgp.Any] (default=None)
➡ Query parameters (if applicable).
Output:
row: mgp.Map
➡ The result table as a stream of rows.
Usage:
Retrieve and inspect data
CALL migrate.mysql('example_table', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;
Filter specific data
CALL migrate.mysql('SELECT * FROM users', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
WHERE row.age >= 30
RETURN row;
Create nodes from migrated data
CALL migrate.mysql('SELECT id, name, age FROM users', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
CREATE (u:User {id: row.id, name: row.name, age: row.age});
Create relationships between users
CALL migrate.mysql('SELECT user1_id, user2_id FROM friendships', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
MATCH (u1:User {id: row.user1_id}), (u2:User {id: row.user2_id})
CREATE (u1)-[:FRIENDS_WITH]->(u2);
neo4j()
With the migrate.neo4j()
procedure, you can access Neo4j and migrate your data to Memgraph.
The resulting nodes and edges are converted into a stream of rows which can include labels, properties, and primitives.
Streaming of raw node and relationship objects is not supported, and users are advised to migrate all the necessary identifiers
in order to recreate the same graph in Memgraph.
Input:
label_or_rel_or_query: str
➡ Label name (written in format(:Label)
), relationship name (written in format[:rel_type]
) or a plain cypher query.config: mgp.Map
➡ Connection parameters (as ingqlalchemy.Neo4j
). Notable parameters arehost[String]
andport[Integer]
.config_path
➡ Path to a JSON file containing configuration parameters.params: mgp.Nullable[mgp.Any] (default=None)
➡ Query parameters (if applicable).
Output:
row: mgp.Map
➡ The result table as a stream of rows.- When retrieving nodes using the
(:Label)
syntax, row will have the following keys:labels
andproperties
. - When retrieving relationships using the
[:REL_TYPE]
syntax, row will have the following keys:from_labels
,to_labels
,from_properties
,to_properties
andedge_properties
. - When retrieving results using a plain Cypher query, row will have keys identical to the returned column names from the Cypher query.
- When retrieving nodes using the
Usage:
Retrieve nodes of certain label and create them in Memgraph
CALL migrate.neo4j('(:Person)', {host: 'localhost', port: 7687})
YIELD row
WITH row.labels AS labels, row.properties as props
CREATE (n:labels) SET n += row.props
Retrieve relationships of certain type and create them in Memgraph
CALL migrate.neo4j('[:KNOWS]', {host: 'localhost', port: 7687})
YIELD row
WITH row.from_labels AS from_labels,
row.to_labels AS to_labels,
row.from_properties AS from_properties,
row.to_properties AS to_properties,
row.edge_properties AS edge_properties
MATCH (p1:Person {id: row.from_properties.id})
MATCH (p2:Person {id: row.to_properties.id})
CREATE (p1)-[r:KNOWS]->(p2)
SET r += edge_properties;
Retrieve information from Neo4j using an arbitrary Cypher query
CALL migrate.neo4j('MATCH (n) RETURN count(n) as cnt', {host: 'localhost', port: 7687})
YIELD row
RETURN row.cnt as cnt;
oracle_db()
With the migrate.oracle_db()
procedure, you can access Oracle DB and migrate your data to Memgraph.
Input:
table_or_sql: str
➡ Table name or an SQL query.config: mgp.Map
➡ Connection parameters (as inmysql.connector.connect
).config_path
➡ Path to a JSON file containing configuration parameters.params: mgp.Nullable[mgp.Any] (default=None)
➡ Query parameters (if applicable).
Output:
row: mgp.Map
➡ The result table as a stream of rows.
Usage:
Retrieve and inspect data
CALL migrate.oracle_db('example_table', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;
Merge nodes to avoid duplicates
CALL migrate.oracle_db('SELECT id, name FROM companies', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'business_db'} )
YIELD row
MERGE (c:Company {id: row.id})
SET c.name = row.name;
postgresql()
With the migrate.postgresql()
procedure, you can access PostgreSQL and migrate your data to Memgraph.
Input:
table_or_sql: str
➡ Table name or an SQL query.config: mgp.Map
➡ Connection parameters (as inmysql.connector.connect
).config_path
➡ Path to a JSON file containing configuration parameters.params: mgp.Nullable[mgp.Any] (default=None)
➡ Query parameters (if applicable).
Output:
row: mgp.Map
➡ The result table as a stream of rows.
Usage:
Retrieve and inspect data
CALL migrate.postgresql('example_table', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;
Create nodes for products
CALL migrate.postgresql('SELECT product_id, name, price FROM products', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'retail_db'} )
YIELD row
CREATE (p:Product {id: row.product_id, name: row.name, price: row.price});
Establish relationships between orders and customers
CALL migrate.postgresql('SELECT order_id, customer_id FROM orders', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'retail_db'} )
YIELD row
MATCH (o:Order {id: row.order_id}), (c:Customer {id: row.customer_id})
CREATE (c)-[:PLACED]->(o);
sql_server()
With the migrate.sql_server()
procedure, you can access SQL Server and migrate your data to Memgraph.
Input:
table_or_sql: str
➡ Table name or an SQL query.config: mgp.Map
➡ Connection parameters (as inmysql.connector.connect
).config_path
➡ Path to a JSON file containing configuration parameters.params: mgp.Nullable[mgp.Any] (default=None)
➡ Query parameters (if applicable).
Output:
row: mgp.Map
➡ The result table as a stream of rows.
Usage:
Retrieve and inspect data
CALL migrate.sql_server('example_table', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;
Convert SQL table rows into graph nodes
CALL migrate.sql_server('SELECT id, name, role FROM employees', {user: 'memgraph',
password: 'password',
host: 'localhost',
database: 'company_db'} )
YIELD row
CREATE (e:Employee {id: row.id, name: row.name, role: row.role});
s3()
With the migrate.s3()
procedure, you can access a CSV file in AWS S3, stream the data into Memgraph,
and transform it into a graph representation using Cypher. The migration is using the Python boto3
client.
Input:
file_path: str
➡ S3 file path in the format's3://bucket-name/path/to/file.csv'
.config: mgp.Map
➡ AWS connection parameters. All of them are optional.aws_access_key_id
- if not provided, environment variableAWS_ACCESS_KEY_ID
will be usedaws_secret_access_key
- if not provided, environment variableAWS_SECRET_ACCESS_KEY
will be usedregion_name
- if not provided, environment variableAWS_REGION
will be usedaws_session_token
- if not provided, environment variableAWS_SESSION_TOKEN
will be used
config_path: str
(optional) ➡ Path to a JSON file containing AWS credentials.
Output:
row: mgp.Map
➡ Each row from the CSV file as a structured dictionary.
Usage:
Retrieve and inspect CSV data from S3
CALL migrate.s3('s3://my-bucket/data.csv', {aws_access_key_id: 'your-key',
aws_secret_access_key: 'your-secret',
region_name: 'us-east-1'} )
YIELD row
RETURN row
LIMIT 100;
Filter specific rows from the CSV
CALL migrate.s3('s3://my-bucket/customers.csv', {aws_access_key_id: 'your-key',
aws_secret_access_key: 'your-secret',
region_name: 'us-west-2'} )
YIELD row
WHERE row.age >= 30
RETURN row;
Create nodes dynamically from CSV data
CALL migrate.s3('s3://my-bucket/employees.csv', {aws_access_key_id: 'your-key',
aws_secret_access_key: 'your-secret',
region_name: 'eu-central-1'} )
YIELD row
CREATE (e:Employee {id: row.id, name: row.name, position: row.position});
servicenow()
With the migrate.servicenow()
procedure, you can access ServiceNow REST API and transfer your data to Memgraph.
The underlying implementation is using the [requests
Python library] to migrate results to Memgraph. The REST API from
ServiceNow must provide results in the format {results: []}
in order for Memgraph to stream it into result rows.
Input:
endpoint: str
➡ ServiceNow endpoint. Users can optionally include their own query parameters to filter results.config: mgp.Map
➡ Connection parameters. Notable connection parameters areusername
andpassword
, perrequests.get()
method.config_path: str
➡ Path to a JSON file containing configuration parameters.
Output:
row: mgp.Map
➡ Each row from the CSV file as a structured dictionary.
Usage:
Retrieve and inspect CSV data from ServiceNow
CALL migrate.servicenow('http://my_endpoint/api/data', {})
YIELD row
RETURN row
LIMIT 100;
Filter specific rows from the CSV
CALL migrate.servicenow('http://my_endpoint/api/data', {})
YIELD row
WHERE row.age >= 30
RETURN row;
Create nodes dynamically from CSV data
CALL migrate.servicenow('http://my_endpoint/api/data', {})
YIELD row
CREATE (e:Employee {id: row.id, name: row.name, position: row.position});