apache / dolphinscheduler

Apache DolphinScheduler is the modern data orchestration platform. Agile to create high performance workflow with low-code
https://dolphinscheduler.apache.org/
Apache License 2.0
12.89k stars 4.63k forks source link

[Bug] [DataSource & UI] Snowflake Datasource Connect Fail #16791

Open sylphy-zh opened 1 week ago

sylphy-zh commented 1 week ago

Search before asking

What happened

DS: 3.2.x Snowflkae JDBC Driver: 3.19.0 (Change driver version same problem. The source code logic is the same)

Two Problem 【P1. Create DataSource test connect fail.】 Error Log: INFO] 2024-11-11 16:30:00.588 +0800 o.a.d.p.d.a.u.DataSourceUtils:[57] - Parameters map:SnowFlakeConnectionParam{user='DSUSER', password='******', address='jdbc:snowflake://apo149212.ap-southeast-1.snowflakecomputing.com:443](http://apo49212.ap-southeast-1.snowflakecomputing.com:443/)', database='null', jdbcUrl='jdbc:snowflake://wq[49212.ap-southeast-1.snowflakecomputing.com:443/DH_PROD](http://apo49212.ap-southeast-1.snowflakecomputing.com:443/DH_PROD)', driverLocation='null', driverClassName='net.snowflake.client.jdbc.SnowflakeDriver', validationQuery='select 1', other='null'} [ERROR] 2024-11-11 16:30:00.785 +0800 o.a.d.p.d.a.d.AbstractDataSourceProcessor:[130] - Check datasource connectivity for: SNOWFLAKE error net.snowflake.client.jdbc.SnowflakeSQLException: Connection string is invalid. Unable to parse. at net.snowflake.client.jdbc.SnowflakeDriver.connect(SnowflakeDriver.java:228) at java.sql.DriverManager.getConnection(DriverManager.java:664) at java.sql.DriverManager.getConnection(DriverManager.java:247)

Check Snowflake Driver Source Code: SnowflakeDriver.java

@Override
  public Connection connect(String url, Properties info) throws SQLException {
    ConnectionParameters connectionParameters =
        overrideByFileConnectionParametersIfAutoConfiguration(url, info);

    if (connectionParameters.getUrl() == null) {
      // expected return format per the JDBC spec for java.sql.Driver#connect()
      throw new SnowflakeSQLException("Unable to connect to url of 'null'.");
    }
    if (!SnowflakeConnectString.hasSupportedPrefix(connectionParameters.getUrl())) {
      return null; // expected return format per the JDBC spec for java.sql.Driver#connect()
    }
    SnowflakeConnectString conStr =
        SnowflakeConnectString.parse(
            connectionParameters.getUrl(), connectionParameters.getParams());
    if (!conStr.isValid()) {
      throw new SnowflakeSQLException("Connection string is invalid. Unable to parse.");
    }
    return new SnowflakeConnectionV1(
        connectionParameters.getUrl(), connectionParameters.getParams());
  }

SnowflakeConnectString.java

public boolean isValid() {
    // invalid if host name is null or empty
    return !Strings.isNullOrEmpty(host);
  }

According to the parameters passed, it is obvious that the host cannot be empty. Then let us see parse method.

public static SnowflakeConnectString parse(String url, Properties info) {
    if (url == null) {
      logger.debug("Connect strings must be non-null");
      return INVALID_CONNECT_STRING;
    }
    int pos = url.indexOf(PREFIX);
    if (pos != 0) {
      logger.debug("Connect strings must start with jdbc:snowflake://");
      return INVALID_CONNECT_STRING; // not start with jdbc:snowflake://
    }
    String afterPrefix = url.substring(pos + PREFIX.length());
    String scheme;
    String host = null;
    int port = -1;
    Map<String, Object> parameters = new HashMap<>();
    try {
      URI uri;

      if (!afterPrefix.startsWith("http://") && !afterPrefix.startsWith("https://")) {
        // not explicitly specified
        afterPrefix = url.substring(url.indexOf("snowflake:"));
      }
      uri = new URI(afterPrefix);
      scheme = uri.getScheme();
      String authority = uri.getRawAuthority();
      String[] hostAndPort = authority.split(":");
      if (hostAndPort.length == 2) {
        host = hostAndPort[0];
        port = Integer.parseInt(hostAndPort[1]);
      } else if (hostAndPort.length == 1) {
        host = hostAndPort[0];
      }
      String queryData = uri.getRawQuery();

      if (!scheme.equals("snowflake") && !scheme.equals("http") && !scheme.equals("https")) {
        logger.debug("Connect strings must have a valid scheme: 'snowflake' or 'http' or 'https'");
        return INVALID_CONNECT_STRING;
      }
      if (Strings.isNullOrEmpty(host)) {
        logger.debug("Connect strings must have a valid host: found null or empty host");
        return INVALID_CONNECT_STRING;
      }
      if (port == -1) {
        port = 443;
      }
      String path = uri.getPath();
      if (!Strings.isNullOrEmpty(path) && !"/".equals(path)) {
        logger.debug("Connect strings must have no path: expecting empty or null or '/'");
        return INVALID_CONNECT_STRING;
      }
      String account = null;
      if (!Strings.isNullOrEmpty(queryData)) {
        String[] params = queryData.split("&");
        for (String p : params) {
          String[] keyVals = p.split("=");
          if (keyVals.length != 2) {
            continue; // ignore invalid pair of parameters.
          }
          try {
            String k = URLDecoder.decode(keyVals[0], "UTF-8");
            String v = URLDecoder.decode(keyVals[1], "UTF-8");
            if ("ssl".equalsIgnoreCase(k) && !getBooleanTrueByDefault(v)) {
              scheme = "http";
            } else if ("account".equalsIgnoreCase(k)) {
              account = v;
            }
            parameters.put(k.toUpperCase(Locale.US), v);
          } catch (UnsupportedEncodingException ex0) {
            logger.info("Failed to decode a parameter {}. Ignored.", p);
          }
        }
      }
      if ("snowflake".equals(scheme)) {
        scheme = "https"; // by default
      }

      if (info.size() > 0) {
        // NOTE: value in info could be any data type.
        // overwrite the properties
        for (Map.Entry<Object, Object> entry : info.entrySet()) {
          String k = entry.getKey().toString();
          Object v = entry.getValue();
          if ("ssl".equalsIgnoreCase(k) && !getBooleanTrueByDefault(v)) {
            scheme = "http";
          } else if ("account".equalsIgnoreCase(k)) {
            account = (String) v;
          }
          parameters.put(k.toUpperCase(Locale.US), v);
        }
      }

      if (parameters.get("ACCOUNT") == null && account == null && host.indexOf(".") > 0) {
        account = host.substring(0, host.indexOf("."));
        // If this is a global URL, then extract out the external ID part
        if (host.contains(".global.")) {
          account = account.substring(0, account.lastIndexOf('-'));
        }
        // Account names should not be altered. Set it to a value without org name
        // if it's a global url
        parameters.put("ACCOUNT", account);
      }

      if (Strings.isNullOrEmpty(account)) {
        logger.debug("Connect strings must contain account identifier");
        return INVALID_CONNECT_STRING;
      }

      // By default, don't allow underscores in host name unless the property is set to true via
      // connection properties.
      boolean allowUnderscoresInHost = false;
      if ("true"
          .equalsIgnoreCase(
              (String)
                  parameters.get(
                      SFSessionProperty.ALLOW_UNDERSCORES_IN_HOST
                          .getPropertyKey()
                          .toUpperCase()))) {
        allowUnderscoresInHost = true;
      }
      if (account.contains("_") && !allowUnderscoresInHost && host.startsWith(account)) {
        String account_wo_uscores = account.replaceAll("_", "-");
        host = host.replaceFirst(account, account_wo_uscores);
      }
      return new SnowflakeConnectString(scheme, host, port, parameters, account);
    } catch (URISyntaxException uriEx) {
      logger.warn(
          "Exception thrown while parsing Snowflake connect string. Illegal character in url.");
      return INVALID_CONNECT_STRING;
    } catch (Exception ex) {
      logger.warn("Exception thrown while parsing Snowflake connect string", ex);
      return INVALID_CONNECT_STRING;
    }
  }

If use the jdbcUrl params("jdbc:snowflake://wq[49212.ap-southeast-1.snowflakecomputing.com:443/DH_PROD") in local test. we can find the question in here:

String path = uri.getPath();
      if (!Strings.isNullOrEmpty(path) && !"/".equals(path)) {
        logger.debug("Connect strings must have no path: expecting empty or null or '/'");
        return INVALID_CONNECT_STRING;
      }

path '/DH_PROD' is not empty. then return INVALID_CONNECT_STRING. so host check is empty private static SnowflakeConnectString INVALID_CONNECT_STRING = new SnowflakeConnectString("", "", -1, Collections.emptyMap(), "");

【P2】【UI】Can not create sql node snowflake source type in workflow. image

Find the 3.2.x source code. https://github.com/apache/dolphinscheduler/blob/3.2.2-release/dolphinscheduler-ui/src/service/modules/data-source/types.ts Snowflake defined in IDataBase but not IDataBaseLabel. Even if Snowflake datasource can be created successfully, it cannot be used normally.

What you expected to happen

According to the above exception and source code traceability, it is confirmed that there is a problem with Snowflake data source related operations. Mainly in UI and backend.

3.2.x source code find snowflake pom.

https://github.com/apache/dolphinscheduler/blob/3.2.2-release/dolphinscheduler-bom/pom.xml

<snowflake-jdbc.version>3.13.29</snowflake-jdbc.version>

change the snowflake version 3.13.29~3.19.x have the same problem

How to reproduce

You can reproduce in snowflake client driver 3.13.x ~ 3.19.x. and dolphinscheduler 3.2.x. Testing the snowflake driver's parse method locally and testing jdbcUrl will get the results.

Anything else

No response

Version

3.2.x

Are you willing to submit PR?

Code of Conduct

davidzollo commented 1 week ago

Hi @sylphy-zh, I have assigned this issue to you, you can submit a PR to solve it.

sylphy-zh commented 6 days ago

Hi @sylphy-zh, I have assigned this issue to you, you can submit a PR to solve it.

ok, I'll try to fix it later if possible