Open zmoog opened 1 year ago
I will take this opportunity to use the Elasticsearch client for Python https://elasticsearch-py.readthedocs.io/en/v8.6.2/
class Shipper:
def __init__(self, endpoint: str, api_key: str) -> None:
self.endpoint = endpoint
self.api_key = api_key
self.client = Elasticsearch(endpoint, api_key=self.api_key)
def info(self):
return self.client.info()
def send(self, message):
pass
def flush(self):
pass
@classmethod
def from_environment(self) -> "Shipper":
return Shipper(
endpoint=os.environ["ELASTICSEARCH_ENDPOINT"],
api_key=os.environ["ELASTICSEARCH_API_KEY"],
)
Little test drive:
$ python
Python 3.10.10 (main, Feb 15 2023, 00:17:25) [Clang 14.0.0 (clang-1400.0.29.202)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from app.adapters.elasticsearch import Shipper
>>> s=Shipper.from_environment()
>>> s.info()
ObjectApiResponse({'name': 'instance-0000000001', 'cluster_name': '2d431bd3ac474926b886569aaefcbd84', 'cluster_uuid': 'x2KFKyo8RE2lfDurKPNCtw', 'version': {'number': '8.5.3', 'build_flavor': 'default', 'build_type': 'docker', 'build_hash': '4ed5ee9afac63de92ec98f404ccbed7d3ba9584e', 'build_date': '2022-12-05T18:22:22.226119656Z', 'build_snapshot': False, 'lucene_version': '9.4.2', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})
>>>
Minimal change to ship data using bulk()
operation:
import datetime
import os
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class Shipper:
def __init__(self, endpoint: str, api_key: str) -> None:
self.endpoint = endpoint
self.api_key = api_key
self.client = Elasticsearch(endpoint, api_key=self.api_key)
self.actions = []
self.index_or_datastream = "zmoog-esf-logs"
def info(self):
return self.client.info()
def send(self, message) -> None:
now = datetime.datetime.now(datetime.timezone.utc)
self.actions.append({
"_index": self.index_or_datastream,
"_op_type": "create",
"_source": {
"@timestamp": now.isoformat(),
"message": message,
},
})
if len(self.actions) >= 100:
self.flush()
def flush(self):
if len(self.actions) > 0:
bulk(self.client, self.actions)
self.actions = []
@classmethod
def from_environment(self) -> "Shipper":
return Shipper(
endpoint=os.environ["ELASTICSEARCH_ENDPOINT"],
api_key=os.environ["ELASTICSEARCH_API_KEY"],
)
Test drive!
$ python
Python 3.10.10 (main, Feb 15 2023, 00:17:25) [Clang 14.0.0 (clang-1400.0.29.202)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from app.adapters.elasticsearch import Shipper
>>> s=Shipper.from_environment()
>>> s.send("hey")
>>> s.flush()
>>>
I created a dead simple dataview for this project:
And here's the document in Elasticsearch:
There's still A LOT of pieces missing, but we can land some bytes into an Elasticsearch cluster. Having a ☕ to celebrate.
Testing it!
Set up the Shipper in the lambda function.
from typing import List
import azure.functions as func
from app.adapters.eventhub import Router
from app.adapters.elasticsearch import Shipper
router = Router(Shipper.from_environment())
def main(events: List[func.EventHubEvent]):
router.dispatch(events)
Run the function locally on the Intel NUC (as of today, azure functions don't run on ARM Macs):
(venv) zmoog@nuc:~/code/projects/zmoog/esf-for-azure/v1/forwarder$ pip install -r requirements.txt
Requirement already satisfied: azure-functions in ./venv/lib/python3.8/site-packages (from -r requirements.txt (line 5)) (1.13.3)
Collecting elasticsearch==8.6.2
Downloading elasticsearch-8.6.2-py3-none-any.whl (385 kB)
|████████████████████████████████| 385 kB 2.5 MB/s
Collecting elastic-transport<9,>=8
Downloading elastic_transport-8.4.0-py3-none-any.whl (59 kB)
|████████████████████████████████| 59 kB 4.4 MB/s
Collecting urllib3<2,>=1.26.2
Downloading urllib3-1.26.15-py2.py3-none-any.whl (140 kB)
|████████████████████████████████| 140 kB 10.5 MB/s
Collecting certifi
Using cached certifi-2022.12.7-py3-none-any.whl (155 kB)
Installing collected packages: urllib3, certifi, elastic-transport, elasticsearch
Successfully installed certifi-2022.12.7 elastic-transport-8.4.0 elasticsearch-8.6.2 urllib3-1.26.15
(venv) zmoog@nuc:~/code/projects/zmoog/esf-for-azure/v1/forwarder$ func start
Found Python version 3.8.10 (python3).
Azure Functions Core Tools
Core Tools Version: 4.0.4895 Commit hash: N/A (64-bit)
Function Runtime Version: 4.13.0.19486
Functions:
process_eventhub: eventHubTrigger
For detailed output, run func with --verbose flag.
[2023-03-12T07:00:32.758Z] Worker process started and initialized.
[2023-03-12T07:00:37.231Z] Host lock lease acquired by instance ID '000000000000000000000000000C2005'.
[2023-03-12T07:00:37.475Z] Executing 'Functions.process_eventhub' (Reason='(null)', Id=2c027b8e-b034-4376-8ee0-5c82fb187129)
[2023-03-12T07:00:37.477Z] Trigger Details: PartionId: 3, Offset: 397912-397912, EnqueueTimeUtc: 2023-03-12T00:34:21.7820000+00:00-2023-03-12T00:34:21.7820000+00:00, SequenceNumber: 46-46, Count: 1
[2023-03-12T07:00:37.757Z] Python EventHub trigger processed an event: {"records": [{"correlationId": "694350da-c81c-4913-baea-d9bee011a225", "time": "2023-03-12T00:24:31.8501481Z", "eventDataId": "0236834d-f3dd-4445-b7d8-ba3904460e59", "resourceId": "/SUBSCRIPTIONS/123RESOURCEGROUPS/xxx/PROVIDERS/MICROSOFT.NETWORK/VIRTUALNETWORKS/TEST-VNET", "operationName": "Microsoft.Advisor/recommendations/available/action", "resultType": "Active", "properties": {"recommendationSchemaVersion": "1.0", "recommendationCategory": "HighAvailability", "recommendationImpact": "Medium", "recommendationName": "Use NAT gateway for outbound connectivity", "recommendationResourceLink": "https://portal.azure.com/#blade/Microsoft_Azure_Expert/RecommendationListBlade/source/ActivityLog/recommendationTypeId/56f0c458-521d-4b8b-a704-c0a099483d19/resourceId/%2Fsubscriptions%2F12cabcb4-86e8-404f-a3d2-1dc9982f45ca%2Fresourcegroups%xxx-test%2Fproviders%2Fmicrosoft.network%2Fvirtualnetworks%2Ftest-vnet", "recommendationType": "56f0c458-521d-4b8b-a704-c0a099483d19"}, "location": "global", "level": "Informational", "category": "Recommendation", "operationVersion": "2017-03-31", "resultSignature": "Succeeded", "resultDescription": "A new recommendation is available.", "durationMs": 10, "callerIpAddress": "0.0.0.0", "identity": {"claims": {"http://schemas.xmlsoap.org/ws/2005/05/identity/claims/emailaddress": "Microsoft.Advisor"}}}]}
[2023-03-12T07:00:38.281Z] PUT https://workshop.es.eastus2.azure.elastic-cloud.com:443/_bulk [status:200 duration:0.556s]
[2023-03-12T07:00:38.375Z] Executed 'Functions.process_eventhub' (Succeeded, Id=2c027b8e-b034-4376-8ee0-5c82fb187129, Duration=1025ms)
I created a small CLI tool https://github.com/zmoog/eventhubs/ to send and receive data from an event hub:
export EVENTHUB_CONNECTION_STRING="whatever"
export EVENTHUB_NAME="
$ eh eventdata send --text '{"message": "Hello Spank 9"}'
Sending event data to Azure Event Hubs
$ eh eventdata receive
Receiving events from Azure Event Hubs
Received event from partition 1: {"message": "Hello Spank 9"}
This will be handy during development.
Inspecting the metadata
property in the event data object from the event hub:
{
"SequenceNumberArray": [
59
],
"PartitionContext": {
"FullyQualifiedNamespace": "whatever.servicebus.windows.net",
"EventHubName": "azurelogs",
"ConsumerGroup": "$Default",
"PartitionId": "2"
},
"PropertiesArray": [
{}
],
"OffsetArray": [
"616008"
],
"PartitionKeyArray": [],
"SystemPropertiesArray": [
{
"x-opt-sequence-number": 59,
"x-opt-offset": 616008,
"x-opt-enqueued-time": "2023-03-13T01: 25: 02.25+00: 00",
"SequenceNumber": 59,
"Offset": 616008,
"PartitionKey": null,
"EnqueuedTimeUtc": "2023-03-13T01: 25: 02.25"
}
],
"EnqueuedTimeUtcArray": [
"2023-03-13T01: 25: 02.25"
]
}
I am now able to send an event to Elasticsearch, but I want to set a stretch goal: send real activity logs to an actual azure activity logs data stream and see them on the OOTB dashboard.
Next steps:
using the generic event hub integration.
Change the eventhubs cli to send data in batches and see if this triggers receiving data wrapped in a records
object (with the current version data are received as they were sent).
Set up a diagnostic setting to route activity logs to my testing
event hub. Currently using the generic event hub integration, I need to add actual routing from eventhub —> data stream
If I send a simple text (not a JSON string) to the Event Hub:
eh -v eventdata send --text 'Hey dude!'
I get the following error:
[2023-03-26T08:31:11.980Z] Executing 'Functions.process_eventhub' (Reason='(null)', Id=f4cd960d-d352-4433-a80b-756194b8fef0)
[2023-03-26T08:31:11.980Z] Trigger Details: PartionId: 0, Offset: 4294967712-4294967712, EnqueueTimeUtc: 2023-03-26T08:31:11.8630000+00:00-2023-03-26T08:31:11.8630000+00:00, SequenceNumber: 53-53, Count: 1
[2023-03-26T08:31:11.985Z] Executed 'Functions.process_eventhub' (Failed, Id=f4cd960d-d352-4433-a80b-756194b8fef0, Duration=9ms)
[2023-03-26T08:31:11.985Z] System.Private.CoreLib: Exception while executing function: Functions.process_eventhub. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'events'. Microsoft.Azure.WebJobs.Host: Binding parameters to complex objects (such as 'Object') uses Json.NET serialization.
[2023-03-26T08:31:11.986Z] 1. Bind the parameter type as 'string' instead of 'Object' to get the raw values and avoid JSON deserialization, or
[2023-03-26T08:31:11.986Z] 2. Change the queue payload to be valid json. The JSON parser failed: Unexpected character encountered while parsing value: H. Path '', line 0, position 0.
[2023-03-26T08:31:11.986Z] .
By default the trigger expects an object (a string that contains a JSON object), so if you're sending a different type (a string containing a sentence in English, in my test) you have to add or change the dataType
value in the function.json
file:
{
"scriptFile": "__init__.py",
"bindings": [
{
"type": "eventHubTrigger",
"name": "events",
"direction": "in",
"eventHubName": "testing",
"connection": "receiverConnectionString",
"cardinality": "many",
"consumerGroup": "$Default",
"dataType": "string"
}
]
}
It's weird I can't find any mention of dataType
at https://learn.microsoft.com/en-us/azure/azure-functions/functions-bindings-event-hubs-trigger?tabs=in-process%2Cfunctionsv2%2Cextensionv5&pivots=programming-language-python
Credit goes to https://learn.microsoft.com/en-us/answers/questions/57841/java-eventhubtrigger-not-working
We must document how to install, set up, and run the forwarder to wrap up this change.
Let's fit everything in the README file, adding dedicated ones as a second option.
Next step: create a Linux machine and test everything end to end.
Relates: #16
We have an Azure Function triggered by logs sent to an event hub.
Now it's time to pick these messages and send them to an Elasticsearch cluster.