snstac / pytak

PyTAK is a Python package for rapid TAK integration.
https://pytak.readthedocs.io/en/stable/
Apache License 2.0
156 stars 42 forks source link

Help with tls certificate and connecting to TAK server #30

Closed remilamoureuxlevesque closed 1 year ago

remilamoureuxlevesque commented 1 year ago

Hi I am having issues using the your library. I would like to send coordinate with CoT to a TAK Server. I have an up and runing server on a raspberry pi setup with the tls certificate but when I try to run the tak_pong example, I receive the following error : Exception: Please specify COT_URL as a full URL, including '://', for example: tcp://tak.example.com:1234 .

Do you have an idea why I get this error ? The only thing I change is to add the require part to make it work with the tls certificates.

Thank in advance for your help.

#!/usr/bin/env python3

import asyncio
import xml.etree.ElementTree as ET

from configparser import ConfigParser

import pytak

class MySerializer(pytak.QueueWorker):
    """
    Defines how you process or generate your Cursor-On-Target Events.
    From there it adds the COT Events to a queue for TX to a COT_URL.
    """

    async def handle_data(self, data):
        """
        Handles pre-COT data and serializes to COT Events, then puts on queue.
        """
        event = data
        await self.put_queue(event)

    async def run(self, number_of_iterations=-1):
        """
        Runs the loop for processing or generating pre-COT data.
        """
        while 1:
            data = tak_pong()
            await self.handle_data(data)
            await asyncio.sleep(20)

def tak_pong():
    """
    Generates a simple takPong COT Event.
    """
    root = ET.Element("event")
    root.set("version", "2.0")
    root.set("type", "t-x-d-d")
    root.set("uid", "takPong")
    root.set("how", "m-g")
    root.set("time", pytak.cot_time())
    root.set("start", pytak.cot_time())
    root.set("stale", pytak.cot_time(3600))
    return ET.tostring(root)

async def main():
    """
    The main definition of your program, sets config params and
    adds your serializer to the asyncio task list.
    """
    config = ConfigParser()
    config["mycottool"] = {"COT_URL": "tcp://192.168.0.115:8089"}
    config["mycottool"] = {"PYTAK_TLS_CLIENT_CERT": "user.pem"}
    config = config["mycottool"]

    # Initializes worker queues and tasks.
    clitool = pytak.CLITool(config)
    await clitool.setup()

    # Add your serializer to the asyncio task list.
    clitool.add_tasks(set([MySerializer(clitool.tx_queue, config)]))

    # Start all tasks.
    await clitool.run()

if __name__ == "__main__":
    asyncio.run(main())