High-speed graph ingestion with Memgraph and Apache Spark
This guide demonstrates how to efficiently ingest 2 million edges into Memgraph within 3 seconds using Apache Spark. The script leverages Memgraph’s IN_MEMORY_ANALYTICAL mode for multithreading, DROP GRAPH for instant dataset clearing, and indexing to optimize relationship insertions. Below, we break down the key components of the script and explain their importance.
Prerequisites
- Memgraph installed and running
- Apache Spark configured (using PySpark)
- this guide uses GQLAlchemy to query Memgraph with Python
- CSV files containing node and relationship data
Compatibility table
When using Apache Spark with Memgraph, ensure that you use the correct Neo4j Spark Connector version based on your Spark and Scala versions. Below is a compatibility table:
Spark Version | Artifact (Scala 2.12) | Artifact (Scala 2.13) |
---|---|---|
3.3 | org.neo4j:neo4j-connector-apache-spark_2.12:5.1.0_for_spark_3 | org.neo4j:neo4j-connector-apache-spark_2.13:5.1.0_for_spark_3 |
3.2 | org.neo4j:neo4j-connector-apache-spark_2.12:5.0.3_for_spark_3 | org.neo4j:neo4j-connector-apache-spark_2.13:5.0.3_for_spark_3 |
3.0 and 3.1 | org.neo4j:neo4j-connector-apache-spark_2.12:4.1.5_for_spark_3 | org.neo4j:neo4j-connector-apache-spark_2.13:4.1.5_for_spark_3 |
Make sure to include the appropriate dependency in your Spark configuration to ensure compatibility with Memgraph when performing high-speed graph ingestion. 🚀-
Running Memgraph with Docker
To start Memgraph using Docker, use the following command:
docker run -it --rm -p 7687:7687 -p 3000:3000 memgraph/memgraph-mage
This command runs Memgraph in an interactive mode, exposing its Bolt and web ports.
CSV file structure
Before running the ingestion script, ensure your CSV files are structured correctly:
-
node CSV (
pokec_medium_nodes.csv
-> 100k nodes)- Contains a single column:
id
: Unique identifier for each person
- Contains a single column:
-
relationship CSV (
pokec_medium_relationships.csv
-> 1.7 million edges)- Contains two columns:
from_id
: ID of the starting node (person)to_id
: ID of the connected node (person)
- Contains two columns:
Code breakdown
1. Establish connection to Memgraph
from gqlalchemy import Memgraph
from pyspark.sql import SparkSession
import time
# Connection to Memgraph
url = "bolt://localhost:7687"
dbname = "memgraph"
memgraph = Memgraph()
Memgraph is initialized using the gqlalchemy
Python client, establishing a connection via Bolt protocol.
2. Configure Memgraph for optimal ingestion
memgraph.execute("STORAGE MODE IN_MEMORY_ANALYTICAL;")
memgraph.execute("DROP GRAPH;")
memgraph.execute("CREATE INDEX ON :Person")
memgraph.execute("CREATE INDEX ON :Person(id)")
Why IN_MEMORY_ANALYTICAL
is important for multithreading
Setting STORAGE MODE IN_MEMORY_ANALYTICAL allows Memgraph to operate entirely in memory, maximizing data processing speed and enabling efficient multi-threaded execution. This mode significantly improves performance for analytical queries and bulk inserts.
How DROP GRAPH
instantly clears the dataset
Executing DROP GRAPH removes all existing data in Memgraph instantaneously. This ensures that each ingestion process starts with a clean slate, avoiding conflicts and ensuring consistent performance.
Why indexing improves relationship ingestion
Creating indexes on the Person
label and its id
property speeds up node lookups when relationships are created.
Without indexing, the database would perform a full scan to find matching nodes, making relationship ingestion significantly slower.
3. Initialize Spark session
spark = (
SparkSession.builder.config("neo4j.url", url)
.config("neo4j.authentication.basic.username", "")
.config("neo4j.authentication.basic.password", "")
.config(
"spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.1.0_for_spark_3",
)
.config("spark.driver.extraJavaOptions", "-Djava.security.manager=allow")
.config("spark.executor.extraJavaOptions", "-Djava.security.manager=allow")
.config("neo4j.database", dbname)
.getOrCreate()
)
The Spark session is configured with the Memgraph connector to facilitate data ingestion.
4. Load nodes from CSV and write to Memgraph
csv_file_path = "pokec_medium_nodes.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
node_ingest_start_time = time.time()
(
df.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", "Person")
.option("node.keys", "id")
.option("batch.size", 5000)
.option("numPartitions", 10)
.save()
)
node_ingest_end_time = time.time()
print(f"✅ Nodes successfully ingested into Memgraph from CSV after {(node_ingest_end_time - node_ingest_start_time):.2f}s!")
After running this part, we can observe the loading time:
✅ Nodes successfully ingested into Memgraph from CSV after 1.85s!
Optimizing node ingestion
- Batch size of 5000: Processes data in chunks to improve efficiency.
- 10 partitions: Utilizes multiple threads for parallel processing, reducing ingestion time.
- Overwrite mode: Ensures a fresh start for the dataset.
5. Load relationships and write to Memgraph
relationships_csv_path = "pokec_medium_relationships.csv"
relationships_df = spark.read.csv(relationships_csv_path, header=True, inferSchema=True)
query = """
MATCH (p1:Person {id:event.from_id})
MATCH (p2:Person {id:event.to_id})
CREATE (p1)-[:CONNECTED_TO]->(p2)
"""
rel_ingest_start_time = time.time()
(
relationships_df.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("query", query)
.option("batch.size", 1000)
.option("numPartitions", 8)
.save()
)
rel_ingest_end_time = time.time()
print(f"✅ Relationships successfully ingested into Memgraph after {(rel_ingest_end_time - rel_ingest_start_time):.2f}s!")
Optimizing relationship ingestion
- Batch size of 1000: Ensures efficient transaction handling.
- 8 partitions: Balances parallelism with system resources.
- Using indexed node lookups: The
MATCH
query relies on indexes for fast lookups.
After ingesting the relationships, we can observe the loading part:
✅ Relationships successfully ingested into Memgraph after 3.10s!
Conclusion
This script showcases how Memgraph, when combined with Spark, can rapidly ingest millions of edges in seconds by leveraging:
- In-memory storage for parallel processing
- Efficient indexing for quick node lookups
- Multi-threaded Spark ingestion with batch processing
For further optimization, consider tuning batch sizes and partition numbers based on hardware resources.