spring-cloud / spring-cloud-dataflow

A microservices-based Streaming and Batch data processing in Cloud Foundry and Kubernetes
https://dataflow.spring.io
Apache License 2.0
1.11k stars 582 forks source link

Java DSL doesn't support multiline stream definition #4225

Open DemianTinkiel opened 4 years ago

DemianTinkiel commented 4 years ago

Description: Entering a multiline stream definition causes a NPE on the server

Release versions: spring-cloud-dataflow-rest-client = 2.6.1 +

{
  "versionInfo": {
    "implementation": {
      "name": "spring-cloud-dataflow-server",
      "version": "2.6.3"
    },
    "core": {
      "name": "Spring Cloud Data Flow Core",
      "version": "2.6.3"
    },
    "dashboard": {
      "name": "Spring Cloud Dataflow UI",
      "version": "2.5.1"
    },
    "shell": {
      "name": "Spring Cloud Data Flow Shell",
      "version": "2.6.3",
      "url": "https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-dataflow-shell/2.6.3/spring-cloud-dataflow-shell-2.6.3.jar"
    }
  },
  "featureInfo": {
    "streamsEnabled": true,
    "tasksEnabled": true,
    "schedulesEnabled": false,
    "grafanaEnabled": false,
    "wavefrontEnabled": false
  },
  "securityInfo": {
    "isAuthenticationEnabled": false,
    "isAuthenticated": false,
    "username": null,
    "roles": []
  },
  "runtimeEnvironment": {
    "appDeployer": {
      "platformSpecificInfo": {},
      "deployerImplementationVersion": "2.5.1",
      "deployerName": "Spring Cloud Skipper Server",
      "deployerSpiVersion": "2.5.2",
      "javaVersion": "1.8.0_192",
      "platformApiVersion": "",
      "platformClientVersion": "",
      "platformHostVersion": "",
      "platformType": "Skipper Managed",
      "springBootVersion": "2.2.8.RELEASE",
      "springVersion": "5.2.7.RELEASE"
    },
    "taskLaunchers": [
      {
        "platformSpecificInfo": {},
        "deployerImplementationVersion": "2.4.1",
        "deployerName": "LocalTaskLauncher",
        "deployerSpiVersion": "2.4.1",
        "javaVersion": "1.8.0_192",
        "platformApiVersion": "Linux 5.6.13-100.fc30.x86_64",
        "platformClientVersion": "5.6.13-100.fc30.x86_64",
        "platformHostVersion": "5.6.13-100.fc30.x86_64",
        "platformType": "Local",
        "springBootVersion": "2.2.8.RELEASE",
        "springVersion": "5.2.7.RELEASE"
      }
    ]
  },
  "grafanaInfo": {
    "url": "",
    "token": "",
    "refreshInterval": 15
  },
  "monitoringDashboardInfo": {
    "url": "",
    "token": "",
    "refreshInterval": 15,
    "source": "default-scdf-source"
  }
}

Steps to reproduce: I know that if I enter the following stream definition in SCDF gui, it can be deployed

ftp > :messages
sftp > :messages
:messages > file
:messages > log

If I try to the same in java DSL:

String streamDefinition="ftp > :messages\n" +
        "sftp > :messages\n" +
        ":messages > file\n" +
        ":messages > log"
flowOperations.streamOperations().createStream(streamName, streamDefinition, streamName, true);

I get the server exception

Additional context: Stack trace

2020-10-30 04:06:45.793 ERROR 1 --- [nio-9393-exec-9] o.s.c.d.s.c.RestControllerAdvice         : Caught exception while handling a request

java.lang.NullPointerException: null
    at org.springframework.cloud.dataflow.core.dsl.StreamParser.eatDestinationReference(StreamParser.java:359)
    at org.springframework.cloud.dataflow.core.dsl.StreamParser.eatSinkDestination(StreamParser.java:282)
    at org.springframework.cloud.dataflow.core.dsl.StreamParser.eatStream(StreamParser.java:158)
    at org.springframework.cloud.dataflow.core.dsl.StreamParser.parse(StreamParser.java:74)
    at org.springframework.cloud.dataflow.core.DefaultStreamDefinitionService.parse(DefaultStreamDefinitionService.java:42)
    at org.springframework.cloud.dataflow.server.service.impl.DefaultStreamService.createStreamDefinition(DefaultStreamService.java:425)
    at org.springframework.cloud.dataflow.server.service.impl.DefaultStreamService.createStream(DefaultStreamService.java:385)
    at org.springframework.cloud.dataflow.server.service.impl.DefaultStreamService$$FastClassBySpringCGLIB$$89697014.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:367)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:118)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
    at org.springframework.cloud.dataflow.server.service.impl.DefaultStreamService$$EnhancerBySpringCGLIB$$4467d537.createStream(<generated>)
    at org.springframework.cloud.dataflow.server.controller.StreamDefinitionController.save(StreamDefinitionController.java:138)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:105)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:879)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:793)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1040)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:943)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
    at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:660)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:741)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:103)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:109)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:373)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:868)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1590)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:748)
DemianTinkiel commented 4 years ago

I know that going through the GUI it actually creates sub-streams for each line. If it's intended for one to the same by making a DSL call for each, then this should be documented!

Otherwise, this would be a great feature to have:

String streamDefinition="ftp > :messages\n" +
        "sftp > :messages\n" +
        ":messages > file\n" +
        ":messages > log";
String [] names=new String
flowOperations.streamOperations().createStream(streamName, streamDefinition, streamName, true,names);

or to maintain API consistency, auto-generate the sub-stream name from original stream name

sabbyanandan commented 3 years ago

Hi, @DemianTinkiel. Even though the streams might have a relationship (because of the existing destination binding), we do treat them as individual streams behind the scenes, and that's by design.

Having to automatically infer it and create stream definitions based on convention sounds like a potential improvement to this workflow. If you want to take a stab at working this on a PR, we can collaborate.