migrate

The migrate module provides an efficient way to transfer graph data from various relational databases into Memgraph. This module allows you to retrieve data from various source systems, transforming tabular data into graph structures.

With Cypher, you can shape the migrated data dynamically, making it easy to create nodes, establish relationships, and enrich your graph. Below are examples showing how to retrieve, filter, and convert relational data into a graph format.

TraitValue
Module typeutil
ImplementationPython
Parallelismsequential

Procedures

arrow_flight()

With the arrow_flight() procedure, users can access data sources which support the Arrow Flight RPC protocol for transfer of large data records to achieve high performance. Underlying implementation is using the pyarrow Python library to stream rows to Memgraph. Dremio is a confirmed data source that works with the arrow_flight() procedure. Other sources may also be compatible, but Dremio is based on previous experience.

Input:

  • query: str ➡ Query used to query the data source.
  • config: mgp.Map ➡ Connection parameters (as in pyarrow.flight.connect). Useful parameters for connecting are host, port, username and password.
  • config_path ➡ Path to a JSON file containing configuration parameters.

Output:

  • row: mgp.Map ➡ The result table as a stream of rows.

Retrieve and inspect data

CALL migrate.arrow_flight('SELECT * FROM users', {username: 'memgraph',
        password: 'password',
        host: 'localhost',
        port: '12345'} )
YIELD row
RETURN row
LIMIT 5000;

Filter specific data

CALL migrate.arrow_flight('SELECT * FROM users', {username: 'memgraph',
        password: 'password',
        host: 'localhost',
        port: '12345'} )
YIELD row
WHERE row.age >= 30
RETURN row;

Create nodes from migrated data

CALL migrate.arrow_flight('SELECT id, name, age FROM users', {username: 'memgraph',
        password: 'password',
        host: 'localhost',
        port: '12345'} )
YIELD row
CREATE (u:User {id: row.id, name: row.name, age: row.age});

Create relationships between users

CALL migrate.arrow_flight('SELECT user1_id, user2_id FROM friendships', {username: 'memgraph',
        password: 'password',
        host: 'localhost',
        port: '12345'} )
YIELD row
MATCH (u1:User {id: row.user1_id}), (u2:User {id: row.user2_id})
CREATE (u1)-[:FRIENDS_WITH]->(u2);

duckdb()

With the migrate.duckdb() procedure, users can connect to the ** DuckDB** database and query various data sources. List of data sources that are supported by DuckDB can be found on their official documentation page. The underlying implementation streams results from DuckDB to Memgraph using the duckdb Python Library. DuckDB is started with the in-memory mode, without any persistence and is used just to proxy to the underlying data sources.

Input:

  • query: str ➡ Table name or an SQL query.
  • setup_queries: mgp.Nullable[List[str]] ➡ List of queries that will be executed prior to the query provided as the initial argument. Used for setting up the connection to additional data sources.

Output:

  • row: mgp.Map ➡ The result table as a stream of rows.

Usage:

Retrieve and inspect data

CALL migrate.duckdb("SELECT * FROM 'test.parquet';")
YIELD row
RETURN row
LIMIT 5000;

Filter specific data

CALL migrate.duckdb("SELECT * FROM 'test.parquet';")
YIELD row
WHERE row.age >= 30
RETURN row;

Create nodes from migrated data

CALL migrate.duckdb("SELECT * FROM 'test.parquet';")
YIELD row
CREATE (u:User {id: row.id, name: row.name, age: row.age});

Create relationships between users

CALL migrate.duckdb("SELECT * FROM 'test.parquet';")
YIELD row
MATCH (u1:User {id: row.user1_id}), (u2:User {id: row.user2_id})
CREATE (u1)-[:FRIENDS_WITH]->(u2);

Setup connection to query additional data sources

CALL migrate.duckdb("SELECT * FROM 's3://your_bucket/your_file.parquet';", ["CREATE SECRET secret1 (TYPE s3, KEY_ID 'key', SECRET 'secret', REGION 'region');"])
YIELD row
MATCH (u1:User {id: row.user1_id}), (u2:User {id: row.user2_id})
CREATE (u1)-[:FRIENDS_WITH]->(u2);

memgraph()

With the migrate.memgraph() procedure, you can access another Memgraph instance and migrate your data to a new Memgraph instance.
The resulting nodes and edges are converted into a stream of rows which can include labels, properties, and primitives.

Streaming of raw node and relationship objects is not supported and users are advised to migrate all the necessary identifiers in order to recreate the same graph in Memgraph.

Input:

  • label_or_rel_or_query: str ➡ Label name (written in format (:Label)), relationship name (written in format [:rel_type]) or a plain cypher query.
  • config: mgp.Map ➡ Connection parameters (as in gqlalchemy.Memgraph). Notable parameters are host[String], and port[Integer]
  • config_path ➡ Path to a JSON file containing configuration parameters.
  • params: mgp.Nullable[mgp.Any] (default=None) ➡ Query parameters (if applicable).

Output:

  • row: mgp.Map ➡ The result table as a stream of rows.
    • when retrieving nodes using the (:Label) syntax, row will have the following keys: labels, and properties
    • when retrieving relationships using the [:REL_TYPE] syntax, row will have the following keys: from_labels, to_labels, from_properties, to_properties, and edge_properties
    • when retrieving results using a plain Cypher query, row will have keys identical to the returned column names from the Cypher query

Usage:

Retrieve nodes of certain label and create them in a new Memgraph instance

CALL migrate.memgraph('(:Person)', {host: 'localhost', port: 7687})
YIELD row
WITH row.labels AS labels, row.properties as props
CREATE (n:labels) SET n += row.props

Retrieve relationships of certain type and create them in a new Memgraph instance

CALL migrate.memgraph('[:KNOWS]', {host: 'localhost', port: 7687})
YIELD row
WITH row.from_labels AS from_labels,
        row.to_labels AS to_labels,
        row.from_properties AS from_properties,
        row.to_properties AS to_properties,
        row.edge_properties AS edge_properties
MATCH (p1:Person {id: row.from_properties.id})
MATCH (p2:Person {id: row.to_properties.id})
CREATE (p1)-[r:KNOWS]->(p2)
SET r += edge_properties;

Retrieve information from Memgraph using an arbitrary Cypher query

CALL migrate.memgraph('MATCH (n) RETURN count(n) as cnt', {host: 'localhost', port: 7687})
YIELD row
RETURN row.cnt as cnt;

mysql()

With the migrate.mysql() procedure, you can access MySQL and migrate your data to Memgraph.
The result table is converted into a stream, and the returned rows can be used to create graph structures.

Input:

  • table_or_sql: str ➡ Table name or an SQL query.
  • config: mgp.Map ➡ Connection parameters (as in mysql.connector.connect).
  • config_path ➡ Path to a JSON file containing configuration parameters.
  • params: mgp.Nullable[mgp.Any] (default=None) ➡ Query parameters (if applicable).

Output:

  • row: mgp.Map ➡ The result table as a stream of rows.

Usage:

Retrieve and inspect data

CALL migrate.mysql('example_table', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;

Filter specific data

CALL migrate.mysql('SELECT * FROM users', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'demo_db'} )
YIELD row
WHERE row.age >= 30
RETURN row;

Create nodes from migrated data

CALL migrate.mysql('SELECT id, name, age FROM users', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'demo_db'} )
YIELD row
CREATE (u:User {id: row.id, name: row.name, age: row.age});

Create relationships between users

CALL migrate.mysql('SELECT user1_id, user2_id FROM friendships', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'demo_db'} )
YIELD row
MATCH (u1:User {id: row.user1_id}), (u2:User {id: row.user2_id})
CREATE (u1)-[:FRIENDS_WITH]->(u2);

neo4j()

With the migrate.neo4j() procedure, you can access Neo4j and migrate your data to Memgraph.
The resulting nodes and edges are converted into a stream of rows which can include labels, properties, and primitives. Streaming of raw node and relationship objects is not supported, and users are advised to migrate all the necessary identifiers in order to recreate the same graph in Memgraph.

Input:

  • label_or_rel_or_query: str ➡ Label name (written in format (:Label)), relationship name (written in format [:rel_type]) or a plain cypher query.
  • config: mgp.Map ➡ Connection parameters (as in gqlalchemy.Neo4j). Notable parameters are host[String] and port[Integer].
  • config_path ➡ Path to a JSON file containing configuration parameters.
  • params: mgp.Nullable[mgp.Any] (default=None) ➡ Query parameters (if applicable).

Output:

  • row: mgp.Map ➡ The result table as a stream of rows.
    • When retrieving nodes using the (:Label) syntax, row will have the following keys: labels and properties.
    • When retrieving relationships using the [:REL_TYPE] syntax, row will have the following keys: from_labels, to_labels, from_properties, to_properties and edge_properties.
    • When retrieving results using a plain Cypher query, row will have keys identical to the returned column names from the Cypher query.

Usage:

Retrieve nodes of certain label and create them in Memgraph

CALL migrate.neo4j('(:Person)', {host: 'localhost', port: 7687})
YIELD row
WITH row.labels AS labels, row.properties as props
CREATE (n:labels) SET n += row.props

Retrieve relationships of certain type and create them in Memgraph

CALL migrate.neo4j('[:KNOWS]', {host: 'localhost', port: 7687})
YIELD row
WITH row.from_labels AS from_labels,
        row.to_labels AS to_labels,
        row.from_properties AS from_properties,
        row.to_properties AS to_properties,
        row.edge_properties AS edge_properties
MATCH (p1:Person {id: row.from_properties.id})
MATCH (p2:Person {id: row.to_properties.id})
CREATE (p1)-[r:KNOWS]->(p2)
SET r += edge_properties;

Retrieve information from Neo4j using an arbitrary Cypher query

CALL migrate.neo4j('MATCH (n) RETURN count(n) as cnt', {host: 'localhost', port: 7687})
YIELD row
RETURN row.cnt as cnt;

oracle_db()

With the migrate.oracle_db() procedure, you can access Oracle DB and migrate your data to Memgraph.

Input:

  • table_or_sql: str ➡ Table name or an SQL query.
  • config: mgp.Map ➡ Connection parameters (as in mysql.connector.connect).
  • config_path ➡ Path to a JSON file containing configuration parameters.
  • params: mgp.Nullable[mgp.Any] (default=None) ➡ Query parameters (if applicable).

Output:

  • row: mgp.Map ➡ The result table as a stream of rows.

Usage:

Retrieve and inspect data

CALL migrate.oracle_db('example_table', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;

Merge nodes to avoid duplicates

CALL migrate.oracle_db('SELECT id, name FROM companies', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'business_db'} )
YIELD row
MERGE (c:Company {id: row.id})
SET c.name = row.name;

postgresql()

With the migrate.postgresql() procedure, you can access PostgreSQL and migrate your data to Memgraph.

Input:

  • table_or_sql: str ➡ Table name or an SQL query.
  • config: mgp.Map ➡ Connection parameters (as in mysql.connector.connect).
  • config_path ➡ Path to a JSON file containing configuration parameters.
  • params: mgp.Nullable[mgp.Any] (default=None) ➡ Query parameters (if applicable).

Output:

  • row: mgp.Map ➡ The result table as a stream of rows.

Usage:

Retrieve and inspect data

CALL migrate.postgresql('example_table', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;

Create nodes for products

CALL migrate.postgresql('SELECT product_id, name, price FROM products', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'retail_db'} )
YIELD row
CREATE (p:Product {id: row.product_id, name: row.name, price: row.price});

Establish relationships between orders and customers

CALL migrate.postgresql('SELECT order_id, customer_id FROM orders', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'retail_db'} )
YIELD row
MATCH (o:Order {id: row.order_id}), (c:Customer {id: row.customer_id})
CREATE (c)-[:PLACED]->(o);

sql_server()

With the migrate.sql_server() procedure, you can access SQL Server and migrate your data to Memgraph.

Input:

  • table_or_sql: str ➡ Table name or an SQL query.
  • config: mgp.Map ➡ Connection parameters (as in mysql.connector.connect).
  • config_path ➡ Path to a JSON file containing configuration parameters.
  • params: mgp.Nullable[mgp.Any] (default=None) ➡ Query parameters (if applicable).

Output:

  • row: mgp.Map ➡ The result table as a stream of rows.

Usage:

Retrieve and inspect data

CALL migrate.sql_server('example_table', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'demo_db'} )
YIELD row
RETURN row
LIMIT 5000;

Convert SQL table rows into graph nodes

CALL migrate.sql_server('SELECT id, name, role FROM employees', {user: 'memgraph',
        password: 'password',
        host: 'localhost',
        database: 'company_db'} )
YIELD row
CREATE (e:Employee {id: row.id, name: row.name, role: row.role});

s3()

With the migrate.s3() procedure, you can access a CSV file in AWS S3, stream the data into Memgraph, and transform it into a graph representation using Cypher. The migration is using the Python boto3 client.

Input:

  • file_path: str ➡ S3 file path in the format 's3://bucket-name/path/to/file.csv'.
  • config: mgp.Map ➡ AWS connection parameters. All of them are optional.
    • aws_access_key_id - if not provided, environment variable AWS_ACCESS_KEY_ID will be used
    • aws_secret_access_key - if not provided, environment variable AWS_SECRET_ACCESS_KEY will be used
    • region_name - if not provided, environment variable AWS_REGION will be used
    • aws_session_token - if not provided, environment variable AWS_SESSION_TOKEN will be used
  • config_path: str (optional) ➡ Path to a JSON file containing AWS credentials.

Output:

  • row: mgp.Map ➡ Each row from the CSV file as a structured dictionary.

Usage:

Retrieve and inspect CSV data from S3

CALL migrate.s3('s3://my-bucket/data.csv', {aws_access_key_id: 'your-key',
        aws_secret_access_key: 'your-secret',
        region_name: 'us-east-1'} )
YIELD row
RETURN row
LIMIT 100;

Filter specific rows from the CSV

CALL migrate.s3('s3://my-bucket/customers.csv', {aws_access_key_id: 'your-key',
        aws_secret_access_key: 'your-secret',
        region_name: 'us-west-2'} )
YIELD row
WHERE row.age >= 30
RETURN row;

Create nodes dynamically from CSV data

CALL migrate.s3('s3://my-bucket/employees.csv', {aws_access_key_id: 'your-key',
        aws_secret_access_key: 'your-secret',
        region_name: 'eu-central-1'} )
YIELD row
CREATE (e:Employee {id: row.id, name: row.name, position: row.position});

servicenow()

With the migrate.servicenow() procedure, you can access ServiceNow REST API and transfer your data to Memgraph. The underlying implementation is using the [requests Python library] to migrate results to Memgraph. The REST API from ServiceNow must provide results in the format {results: []} in order for Memgraph to stream it into result rows.

Input:

  • endpoint: str ➡ ServiceNow endpoint. Users can optionally include their own query parameters to filter results.
  • config: mgp.Map ➡ Connection parameters. Notable connection parameters are username and password, per requests.get() method.
  • config_path: str ➡ Path to a JSON file containing configuration parameters.

Output:

  • row: mgp.Map ➡ Each row from the CSV file as a structured dictionary.

Usage:

Retrieve and inspect CSV data from ServiceNow

CALL migrate.servicenow('http://my_endpoint/api/data', {})
YIELD row
RETURN row
LIMIT 100;

Filter specific rows from the CSV

CALL migrate.servicenow('http://my_endpoint/api/data', {})
YIELD row
WHERE row.age >= 30
RETURN row;

Create nodes dynamically from CSV data

CALL migrate.servicenow('http://my_endpoint/api/data', {})
YIELD row
CREATE (e:Employee {id: row.id, name: row.name, position: row.position});