Cortex Data Framework is a robust, extensible platform designed to facilitate real-time data streaming, processing, and state management. It provides developers with a comprehensive suite of tools and libraries to build scalable, high-performance data pipelines tailored to diverse use cases. By abstracting underlying streaming technologies and state management solutions, Cortex Data Framework enables seamless integration, simplified development workflows, and enhanced maintainability for complex data-driven applications.
Modular Architecture: Comprises distinct, interchangeable modules for streaming, state management, and connectors, allowing developers to choose components that best fit their requirements.
Extensive Streaming Support: Natively integrates with popular streaming platforms like Kafka and Pulsar, ensuring reliable and efficient data ingestion and distribution.
Flexible State Management: Offers in-memory and persistent state storage options (e.g., RocksDB) to maintain stateful computations and enable advanced analytics.
Developer-Friendly APIs: Provides intuitive and expressive APIs for building, configuring, and managing data streams and state stores, reducing development overhead.
Thread-Safe Operations: Ensures data integrity and consistency through built-in thread safety mechanisms, suitable for concurrent processing environments.
Telemetry and Monitoring: Integrates with telemetry tools to monitor performance metrics, aiding in proactive maintenance and optimization.
Cortex Data Framework is available through the NuGet Package Manager. You can easily add the necessary packages to your .NET project using the following methods:
Open your terminal or command prompt and navigate to your project directory, then run the following commands to install the desired packages:
# Install Cortex.Streams
dotnet add package Cortex.Streams
# Install Cortex.Streams.Kafka
dotnet add package Cortex.Streams.Kafka
# Install Cortex.Streams.Pulsar
dotnet add package Cortex.Streams.Pulsar
# Install Cortex.States
dotnet add package Cortex.States
# Install Cortex.States.RocksDb
dotnet add package Cortex.States.RocksDb
# Install Cortex.Streams
Install-Package Cortex.Streams
# Install Cortex.Streams.Kafka
Install-Package Cortex.Streams.Kafka
# Install Cortex.Streams.Pulsar
Install-Package Cortex.Streams.Pulsar
# Install Cortex.States
Install-Package Cortex.States
# Install Cortex.States.RocksDb
Install-Package Cortex.States.RocksDb
Cortex Data Framework makes it easy to set up and run real-time data processing pipelines. Below are some simple examples to get you started.
Scenario: Track the number of clicks on different web pages in real-time and display the aggregated counts.
Steps:
1. Define the Event Class
public class ClickEvent
{
public string PageUrl { get; set; }
public DateTime Timestamp { get; set; }
}
2. Build the Stream Pipeline
static void Main(string[] args)
{
var clickCountStore = new InMemoryStateStore<string, int>("ClickAggregateStore");
// Build the stream
var stream = StreamBuilder<ClickEvent, ClickEvent>.CreateNewStream("ClickStream")
.Stream()
.Filter(e => !string.IsNullOrEmpty(e.PageUrl))
.GroupBy(
e => e.PageUrl, // Key selector: group by PageUrl
stateStoreName: "ClickGroupStore")
.Aggregate<string, int>(
e => e.PageUrl, // Key selector for aggregation
(count, e) => count + 1, // Aggregation function: increment count
stateStoreName: "ClickAggregateStore")
.Sink(e =>
{
Console.WriteLine($"Page: {e.PageUrl}");
})
.Build();
// start the stream
stream.Start();
Emitting some random events into the stream
// emit some events
var random = new Random();
var pages = new[] { "/home", "/about", "/contact", "/products" };
while (true)
{
var page = pages[random.Next(pages.Length)];
var click = new ClickEvent
{
PageUrl = page,
Timestamp = DateTime.UtcNow
};
stream.Emit(click);
Thread.Sleep(100); // Simulate click rate
}
3. Access Aggregated Data
Retrieve and display the click counts from the state store
// Access the aggregate state store data
var aggregateStore = stream.GetStateStoreByName<InMemoryStateStore<string, int>>("ClickAggregateStore");
// Access the groupby state store data
var groupByStore = stream.GetStateStoreByName<InMemoryStateStore<string, List<ClickEvent>>>("ClickGroupStore")
if (aggregateStore != null)
{
Console.WriteLine("\nAggregated Click Counts:");
foreach (var kvp in aggregateStore.GetAll())
{
Console.WriteLine($"Page: {kvp.Key}, Clicks: {kvp.Value}");
}
}
else
{
Console.WriteLine("Aggregate state store not found.");
}
We welcome contributions from the community! Whether it's reporting bugs, suggesting features, or submitting pull requests, your involvement helps improve Cortex for everyone.
git checkout -b feature/YourFeature
git commit -m "Add your feature"
git push origin feature/YourFeature
Describe your changes and submit the pull request for review.
This project is licensed under the MIT License.
Cortex is an open-source project maintained by BuilderSoft. Your support helps us continue developing and improving Vortex. Consider sponsoring us to contribute to the future of resilient streaming platforms.
Contact Us: cortex@buildersoft.io
We'd love to hear from you! Whether you have questions, feedback, or need support, feel free to reach out.
Thank you for using Cortex Data Framework! We hope it empowers you to build scalable and efficient data processing pipelines effortlessly.
Built with ❤️ by the BuilderSoft team.