B23admin / nifi-stateless-operator

An Operator for scheduling and executing NiFi Flows as Jobs on Kubernetes
Apache License 2.0
53 stars 12 forks source link

While executing a NiFi job asking for Input Port #4

Closed sanjiv1980 closed 4 years ago

sanjiv1980 commented 5 years ago

I am new to NiFi world and that to NiFi-Fn it's very new , I have gone through documentation and deployed the same on EKS (AWS) cluster . While executing NiFi job (any job) , It throwing exception like input port is missing , not able to understand why it's required ?

Note: Is the case of RPG ..? could you please explain and better if you attach one simple template , that will great help

kubectl logs nififn-listing-s3-job-tf6sg -n nifi-fn-operator-system
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/nifi-1.8.0/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/nififn/lib/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/nififn/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
Exception in thread "main" java.lang.IllegalArgumentException: Flow does not have an input port...
    at org.apache.nifi.fn.core.FnFlow.enqueueFlowFile(FnFlow.java:254)
    at org.apache.nifi.fn.runtimes.Program.main(Program.java:88)
dbkegley commented 5 years ago

For nifi-fn to initialize properly there must be a single input port at the root level of the NiFi canvas. Any flowfiles defined in the flowFiles field of the manifest will be sent to this input port and processed by the flow

flowFiles:
  - "absolute.path-/path/to/input/data/;filename-testfile.txt"
sanjiv1980 commented 5 years ago

Hi @dbkegley , Just wanted to know like while submitting a job we are using the docker images image: "samhjelmfelt/nifi-fn:latest"

apiVersion: nififn.b23.io/v1alpha1
kind: NiFiFn
metadata:
  labels:
    controller-tools.k8s.io: "1.0"
  name: nififn-file
  namespace: nifi-fn-operator-system
spec:
  image: "samhjelmfelt/nifi-fn:latest"
  registryUrl: "http://registry-service:18080"
  bucket: "37602fe6-0d58-4179-a34a-aa3c494a6cca"
  flow: "16887b00-9669-4c6c-8991-03ac6e6c24be"
  flowVersion: 1
  flowFiles:
  - "absolute.path-/opt/nifi/nifi-current/test/input/;filename-a.txt"

How is it different from NiFi-Stateless..? Better If you could describe it.

dbkegley commented 5 years ago

This operator is currently just a kuberentes wrapper for the functionality that nifi-stateless already provides. I would like to add additional functionality to make it more useful for scheduling/triggering stateless flows in the future. But for now, it's really just a thin wrapper that allows you to manage your stateless flows as a kubernetes resource

This manifest can be used for testing the nifi-stateless image until I make the updates to the operator.

apiVersion: v1
kind: ConfigMap
metadata:
  name: flow-json
data:
  config.json: |
    {
      "registryUrl": "http://registry-service:18080",
      "bucketId": "8444dc91-00f3-415c-a965-256ffa28c3f5",
      "flowId": "c2408c92-6122-4451-b2a6-bd356e8afd38",
      "flowVersion": "-1",
      "flowFiles":[{
          "absolute.path": "/tmp/nifistateless/input/",
          "filename": "test.txt",
          "nifi_content": "hello"
      },
      {
          "absolute.path": "/tmp/nifistateless/input/",
          "filename": "test2.txt",
          "nifi_content": "hi"
      }],
      "parameters": {
        "DestinationDirectory" : "/tmp/nifistateless/output2/",
        "Username" : "jdoe",
        "Password": { "sensitive": "true", "value": "password" }
      }
    }
---
apiVersion: batch/v1
kind: Job
metadata:
  name: stateless-flow
spec:
  template:
    spec:
      containers:
      - name: flow
        imagePullPolicy: Always
        image: dbkegley/nifi-stateless:1.10.0-SNAPSHOT
        #args: ["RunFromFlowXml", "Once", "--file", "/opt/nifi/config.json"] 
        args: ["RunFromRegistry", "Once", "--file", "/opt/nifi/config.json"]
        volumeMounts:
        - name: flow-config
          mountPath: /opt/nifi/config.json
          subPath: config.json
        - name: tmp-volume
          mountPath: /tmp
      restartPolicy: Never
      volumes:
      - name: tmp-volume
        emptyDir: {}
      - name: flow-config
        configMap:
          name: flow-json
  backoffLimit: 4