confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
86 stars 1.04k forks source link

Avro struct support doesn't work with twitter schema #1536

Closed rmoff closed 6 years ago

rmoff commented 6 years ago

KSQL 5.0.0-SNAPSHOT (build 50)

With a view to updating this blog to use the new nested Avro support, I tried this out but it fails:

ksql> create stream twitter with (kafka_topic='twitter_avro_01',value_format='avro');
 Unable to verify the AVRO schema is compatible with KSQL. Map key must be of type STRING
ksql>

Kafka Connect as the source of data, using @jcustenborder's twitter source.

Kafka Connect config:

{
  "name": "twitter_source_avro_01",
  "config": {
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
    "kafka.delete.topic": "twitter_deletes_avro_01",
    "twitter.oauth.consumerKey": "XXXX",
    "twitter.oauth.consumerSecret": "XXXX",
    "twitter.oauth.accessToken": "XXXX",
    "twitter.oauth.accessTokenSecret": "XXXX",
    "kafka.status.topic": "twitter_avro_01",
    "connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
    "process.deletes": true,
    "filter.keywords": "never,gonna,give,you,up"
  }
}

Sample message:

ksql> print 'twitter_avro_01';
Format:AVRO
05/07/18 11:12:51 BST, ��������, {"CreatedAt": 1530785571000, "Id": 1014814356629086208, "Text": "@cyberomin Did Amazon start AWS because they had excess infra, or did they build the extra infra to accommodate their AWS plan?\n\nAWS wasn't a happenstance, it was well planned.", "Source": "<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>", "Truncated": true, "InReplyToStatusId": 1014745992007122944, "InReplyToUserId": 58825393, "InReplyToScreenName": "cyberomin", "GeoLocation": null, "Place": null, "Favorited": false, "Retweeted": false, "FavoriteCount": 0, "User": {"Id": 886892915196387328, "Name": "JJ Sankara", "ScreenName": "uberJJ", "Location": "Nigeria", "Description": "Raconteur.", "ContributorsEnabled": false, "ProfileImageURL": "http://pbs.twimg.com/profile_images/938825898438008833/HCMICbd5_normal.jpg", "BiggerProfileImageURL": "http://pbs.twimg.com/profile_images/938825898438008833/HCMICbd5_bigger.jpg", "MiniProfileImageURL": "http://pbs.twimg.com/profile_images/938825898438008833/HCMICbd5_mini.jpg", "OriginalProfileImageURL": "http://pbs.twimg.com/profile_images/938825898438008833/HCMICbd5.jpg", "ProfileImageURLHttps": "https://pbs.twimg.com/profile_images/938825898438008833/HCMICbd5_normal.jpg", "BiggerProfileImageURLHttps": "https://pbs.twimg.com/profile_images/938825898438008833/HCMICbd5_bigger.jpg", "MiniProfileImageURLHttps": "https://pbs.twimg.com/profile_images/938825898438008833/HCMICbd5_mini.jpg", "OriginalProfileImageURLHttps": "https://pbs.twimg.com/profile_images/938825898438008833/HCMICbd5.jpg", "DefaultProfileImage": false, "URL": "http://medium.com/@jogbojogbo", "Protected": false, "FollowersCount": 542, "ProfileBackgroundColor": "000000", "ProfileTextColor": "000000", "ProfileLinkColor": "000000", "ProfileSidebarFillColor": "000000", "ProfileSidebarBorderColor": "000000", "ProfileUseBackgroundImage": false, "DefaultProfile": false, "ShowAllInlineMedia": false, "FriendsCount": 272, "CreatedAt": 1500286723000, "FavouritesCount": 578, "UtcOffset": -1, "TimeZone": null, "ProfileBackgroundImageURL": "http://abs.twimg.com/images/themes/theme1/bg.png", "ProfileBackgroundImageUrlHttps": "https://abs.twimg.com/images/themes/theme1/bg.png", "ProfileBannerURL": "https://pbs.twimg.com/profile_banners/886892915196387328/1505692115/web", "ProfileBannerRetinaURL": "https://pbs.twimg.com/profile_banners/886892915196387328/1505692115/web_retina", "ProfileBannerIPadURL": "https://pbs.twimg.com/profile_banners/886892915196387328/1505692115/ipad", "ProfileBannerIPadRetinaURL": "https://pbs.twimg.com/profile_banners/886892915196387328/1505692115/ipad_retina", "ProfileBannerMobileURL": "https://pbs.twimg.com/profile_banners/886892915196387328/1505692115/mobile", "ProfileBannerMobileRetinaURL": "https://pbs.twimg.com/profile_banners/886892915196387328/1505692115/mobile_retina", "ProfileBackgroundTiled": false, "Lang": "en", "StatusesCount": 3912, "GeoEnabled": false, "Verified": false, "Translator": false, "ListedCount": 2, "FollowRequestSent": false, "WithheldInCountries": []}, "Retweet": false, "Contributors": [], "RetweetCount": 0, "RetweetedByMe": false, "CurrentUserRetweetId": -1, "PossiblySensitive": false, "Lang": "en", "WithheldInCountries": [], "HashtagEntities": [], "UserMentionEntities": [{"Name": "Celestine Omin", "Id": 58825393, "Text": "cyberomin", "ScreenName": "cyberomin", "Start": 0, "End": 10}], "MediaEntities": [], "SymbolEntities": [], "URLEntities": []}

Value Schema:

{
  "type": "record",
  "name": "Status",
  "namespace": "com.github.jcustenborder.kafka.connect.twitter",
  "fields": [
    {
      "name": "CreatedAt",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "Return the created_at",
          "connect.version": 1,
          "connect.name": "org.apache.kafka.connect.data.Timestamp",
          "logicalType": "timestamp-millis"
        }
      ],
      "doc": "Return the created_at",
      "default": null
    },
    {
      "name": "Id",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "Returns the id of the status"
        }
      ],
      "doc": "Returns the id of the status",
      "default": null
    },
    {
      "name": "Text",
      "type": [
        "null",
        {
          "type": "string",
          "connect.doc": "Returns the text of the status"
        }
      ],
      "doc": "Returns the text of the status",
      "default": null
    },
    {
      "name": "Source",
      "type": [
        "null",
        {
          "type": "string",
          "connect.doc": "Returns the source"
        }
      ],
      "doc": "Returns the source",
      "default": null
    },
    {
      "name": "Truncated",
      "type": [
        "null",
        {
          "type": "boolean",
          "connect.doc": "Test if the status is truncated"
        }
      ],
      "doc": "Test if the status is truncated",
      "default": null
    },
    {
      "name": "InReplyToStatusId",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "Returns the in_reply_tostatus_id"
        }
      ],
      "doc": "Returns the in_reply_tostatus_id",
      "default": null
    },
    {
      "name": "InReplyToUserId",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "Returns the in_reply_user_id"
        }
      ],
      "doc": "Returns the in_reply_user_id",
      "default": null
    },
    {
      "name": "InReplyToScreenName",
      "type": [
        "null",
        {
          "type": "string",
          "connect.doc": "Returns the in_reply_to_screen_name"
        }
      ],
      "doc": "Returns the in_reply_to_screen_name",
      "default": null
    },
    {
      "name": "GeoLocation",
      "type": [
        "null",
        {
          "type": "record",
          "name": "GeoLocation",
          "fields": [
            {
              "name": "Latitude",
              "type": {
                "type": "double",
                "connect.doc": "returns the latitude of the geo location"
              },
              "doc": "returns the latitude of the geo location"
            },
            {
              "name": "Longitude",
              "type": {
                "type": "double",
                "connect.doc": "returns the longitude of the geo location"
              },
              "doc": "returns the longitude of the geo location"
            }
          ],
          "connect.doc": "Returns The location that this tweet refers to if available.",
          "connect.name": "com.github.jcustenborder.kafka.connect.twitter.GeoLocation"
        }
      ],
      "doc": "Returns The location that this tweet refers to if available.",
      "default": null
    },
    {
      "name": "Place",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Place",
          "fields": [
            {
              "name": "Name",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "StreetAddress",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "CountryCode",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "Id",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "Country",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "PlaceType",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "URL",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "FullName",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ],
          "connect.doc": "Returns the place attached to this status",
          "connect.name": "com.github.jcustenborder.kafka.connect.twitter.Place"
        }
      ],
      "doc": "Returns the place attached to this status",
      "default": null
    },
    {
      "name": "Favorited",
      "type": [
        "null",
        {
          "type": "boolean",
          "connect.doc": "Test if the status is favorited"
        }
      ],
      "doc": "Test if the status is favorited",
      "default": null
    },
    {
      "name": "Retweeted",
      "type": [
        "null",
        {
          "type": "boolean",
          "connect.doc": "Test if the status is retweeted"
        }
      ],
      "doc": "Test if the status is retweeted",
      "default": null
    },
    {
      "name": "FavoriteCount",
      "type": [
        "null",
        {
          "type": "int",
          "connect.doc": "Indicates approximately how many times this Tweet has been 'favorited' by Twitter users."
        }
      ],
      "doc": "Indicates approximately how many times this Tweet has been 'favorited' by Twitter users.",
      "default": null
    },
    {
      "name": "User",
      "type": {
        "type": "record",
        "name": "User",
        "fields": [
          {
            "name": "Id",
            "type": [
              "null",
              {
                "type": "long",
                "connect.doc": "Returns the id of the user"
              }
            ],
            "doc": "Returns the id of the user",
            "default": null
          },
          {
            "name": "Name",
            "type": [
              "null",
              {
                "type": "string",
                "connect.doc": "Returns the name of the user"
              }
            ],
            "doc": "Returns the name of the user",
            "default": null
          },
          {
            "name": "ScreenName",
            "type": [
              "null",
              {
                "type": "string",
                "connect.doc": "Returns the screen name of the user"
              }
            ],
            "doc": "Returns the screen name of the user",
            "default": null
          },
          {
            "name": "Location",
            "type": [
              "null",
              {
                "type": "string",
                "connect.doc": "Returns the location of the user"
              }
            ],
            "doc": "Returns the location of the user",
            "default": null
          },
          {
            "name": "Description",
            "type": [
              "null",
              {
                "type": "string",
                "connect.doc": "Returns the description of the user"
              }
            ],
            "doc": "Returns the description of the user",
            "default": null
          },
          {
            "name": "ContributorsEnabled",
            "type": [
              "null",
              {
                "type": "boolean",
                "connect.doc": "Tests if the user is enabling contributors"
              }
            ],
            "doc": "Tests if the user is enabling contributors",
            "default": null
          },
          {
            "name": "ProfileImageURL",
            "type": [
              "null",
              {
                "type": "string",
                "connect.doc": "Returns the profile image url of the user"
              }
            ],
            "doc": "Returns the profile image url of the user",
            "default": null
          },
          {
            "name": "BiggerProfileImageURL",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "MiniProfileImageURL",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "OriginalProfileImageURL",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileImageURLHttps",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "BiggerProfileImageURLHttps",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "MiniProfileImageURLHttps",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "OriginalProfileImageURLHttps",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "DefaultProfileImage",
            "type": [
              "null",
              {
                "type": "boolean",
                "connect.doc": "Tests if the user has not uploaded their own avatar"
              }
            ],
            "doc": "Tests if the user has not uploaded their own avatar",
            "default": null
          },
          {
            "name": "URL",
            "type": [
              "null",
              {
                "type": "string",
                "connect.doc": "Returns the url of the user"
              }
            ],
            "doc": "Returns the url of the user",
            "default": null
          },
          {
            "name": "Protected",
            "type": [
              "null",
              {
                "type": "boolean",
                "connect.doc": "Test if the user status is protected"
              }
            ],
            "doc": "Test if the user status is protected",
            "default": null
          },
          {
            "name": "FollowersCount",
            "type": [
              "null",
              {
                "type": "int",
                "connect.doc": "Returns the number of followers"
              }
            ],
            "doc": "Returns the number of followers",
            "default": null
          },
          {
            "name": "ProfileBackgroundColor",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileTextColor",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileLinkColor",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileSidebarFillColor",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileSidebarBorderColor",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileUseBackgroundImage",
            "type": [
              "null",
              "boolean"
            ],
            "default": null
          },
          {
            "name": "DefaultProfile",
            "type": [
              "null",
              {
                "type": "boolean",
                "connect.doc": "Tests if the user has not altered the theme or background"
              }
            ],
            "doc": "Tests if the user has not altered the theme or background",
            "default": null
          },
          {
            "name": "ShowAllInlineMedia",
            "type": [
              "null",
              "boolean"
            ],
            "default": null
          },
          {
            "name": "FriendsCount",
            "type": [
              "null",
              {
                "type": "int",
                "connect.doc": "Returns the number of users the user follows (AKA 'followings')"
              }
            ],
            "doc": "Returns the number of users the user follows (AKA 'followings')",
            "default": null
          },
          {
            "name": "CreatedAt",
            "type": [
              "null",
              {
                "type": "long",
                "connect.version": 1,
                "connect.name": "org.apache.kafka.connect.data.Timestamp",
                "logicalType": "timestamp-millis"
              }
            ],
            "default": null
          },
          {
            "name": "FavouritesCount",
            "type": [
              "null",
              "int"
            ],
            "default": null
          },
          {
            "name": "UtcOffset",
            "type": [
              "null",
              "int"
            ],
            "default": null
          },
          {
            "name": "TimeZone",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileBackgroundImageURL",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileBackgroundImageUrlHttps",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileBannerURL",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileBannerRetinaURL",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileBannerIPadURL",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileBannerIPadRetinaURL",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileBannerMobileURL",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileBannerMobileRetinaURL",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "ProfileBackgroundTiled",
            "type": [
              "null",
              "boolean"
            ],
            "default": null
          },
          {
            "name": "Lang",
            "type": [
              "null",
              {
                "type": "string",
                "connect.doc": "Returns the preferred language of the user"
              }
            ],
            "doc": "Returns the preferred language of the user",
            "default": null
          },
          {
            "name": "StatusesCount",
            "type": [
              "null",
              "int"
            ],
            "default": null
          },
          {
            "name": "GeoEnabled",
            "type": [
              "null",
              "boolean"
            ],
            "default": null
          },
          {
            "name": "Verified",
            "type": [
              "null",
              "boolean"
            ],
            "default": null
          },
          {
            "name": "Translator",
            "type": [
              "null",
              "boolean"
            ],
            "default": null
          },
          {
            "name": "ListedCount",
            "type": [
              "null",
              {
                "type": "int",
                "connect.doc": "Returns the number of public lists the user is listed on, or -1 if the count is unavailable."
              }
            ],
            "doc": "Returns the number of public lists the user is listed on, or -1 if the count is unavailable.",
            "default": null
          },
          {
            "name": "FollowRequestSent",
            "type": [
              "null",
              {
                "type": "boolean",
                "connect.doc": "Returns true if the authenticating user has requested to follow this user, otherwise false."
              }
            ],
            "doc": "Returns true if the authenticating user has requested to follow this user, otherwise false.",
            "default": null
          },
          {
            "name": "WithheldInCountries",
            "type": {
              "type": "array",
              "items": "string",
              "connect.doc": "Returns the list of country codes where the user is withheld"
            },
            "doc": "Returns the list of country codes where the user is withheld"
          }
        ],
        "connect.doc": "Return the user associated with the status. This can be null if the instance is from User.getStatus().",
        "connect.name": "com.github.jcustenborder.kafka.connect.twitter.User"
      },
      "doc": "Return the user associated with the status. This can be null if the instance is from User.getStatus()."
    },
    {
      "name": "Retweet",
      "type": [
        "null",
        "boolean"
      ],
      "default": null
    },
    {
      "name": "Contributors",
      "type": {
        "type": "array",
        "items": "long",
        "connect.doc": "Returns an array of contributors, or null if no contributor is associated with this status."
      },
      "doc": "Returns an array of contributors, or null if no contributor is associated with this status."
    },
    {
      "name": "RetweetCount",
      "type": [
        "null",
        {
          "type": "int",
          "connect.doc": "Returns the number of times this tweet has been retweeted, or -1 when the tweet was created before this feature was enabled."
        }
      ],
      "doc": "Returns the number of times this tweet has been retweeted, or -1 when the tweet was created before this feature was enabled.",
      "default": null
    },
    {
      "name": "RetweetedByMe",
      "type": [
        "null",
        "boolean"
      ],
      "default": null
    },
    {
      "name": "CurrentUserRetweetId",
      "type": [
        "null",
        {
          "type": "long",
          "connect.doc": "Returns the authenticating user's retweet's id of this tweet, or -1L when the tweet was created before this feature was enabled."
        }
      ],
      "doc": "Returns the authenticating user's retweet's id of this tweet, or -1L when the tweet was created before this feature was enabled.",
      "default": null
    },
    {
      "name": "PossiblySensitive",
      "type": [
        "null",
        "boolean"
      ],
      "default": null
    },
    {
      "name": "Lang",
      "type": [
        "null",
        {
          "type": "string",
          "connect.doc": "Returns the lang of the status text if available."
        }
      ],
      "doc": "Returns the lang of the status text if available.",
      "default": null
    },
    {
      "name": "WithheldInCountries",
      "type": {
        "type": "array",
        "items": "string",
        "connect.doc": "Returns the list of country codes where the tweet is withheld"
      },
      "doc": "Returns the list of country codes where the tweet is withheld"
    },
    {
      "name": "HashtagEntities",
      "type": [
        "null",
        {
          "type": "array",
          "items": {
            "type": "record",
            "name": "HashtagEntity",
            "fields": [
              {
                "name": "Text",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the text of the hashtag without #."
                  }
                ],
                "doc": "Returns the text of the hashtag without #.",
                "default": null
              },
              {
                "name": "Start",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": "Returns the index of the start character of the hashtag."
                  }
                ],
                "doc": "Returns the index of the start character of the hashtag.",
                "default": null
              },
              {
                "name": "End",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": "Returns the index of the end character of the hashtag."
                  }
                ],
                "doc": "Returns the index of the end character of the hashtag.",
                "default": null
              }
            ],
            "connect.doc": "",
            "connect.name": "com.github.jcustenborder.kafka.connect.twitter.HashtagEntity"
          },
          "connect.doc": "Returns an array if hashtag mentioned in the tweet."
        }
      ],
      "doc": "Returns an array if hashtag mentioned in the tweet.",
      "default": null
    },
    {
      "name": "UserMentionEntities",
      "type": [
        "null",
        {
          "type": "array",
          "items": {
            "type": "record",
            "name": "UserMentionEntity",
            "fields": [
              {
                "name": "Name",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the name mentioned in the status."
                  }
                ],
                "doc": "Returns the name mentioned in the status.",
                "default": null
              },
              {
                "name": "Id",
                "type": [
                  "null",
                  {
                    "type": "long",
                    "connect.doc": "Returns the user id mentioned in the status."
                  }
                ],
                "doc": "Returns the user id mentioned in the status.",
                "default": null
              },
              {
                "name": "Text",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the screen name mentioned in the status."
                  }
                ],
                "doc": "Returns the screen name mentioned in the status.",
                "default": null
              },
              {
                "name": "ScreenName",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the screen name mentioned in the status."
                  }
                ],
                "doc": "Returns the screen name mentioned in the status.",
                "default": null
              },
              {
                "name": "Start",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": "Returns the index of the start character of the user mention."
                  }
                ],
                "doc": "Returns the index of the start character of the user mention.",
                "default": null
              },
              {
                "name": "End",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": "Returns the index of the end character of the user mention."
                  }
                ],
                "doc": "Returns the index of the end character of the user mention.",
                "default": null
              }
            ],
            "connect.doc": "",
            "connect.name": "com.github.jcustenborder.kafka.connect.twitter.UserMentionEntity"
          },
          "connect.doc": "Returns an array of user mentions in the tweet."
        }
      ],
      "doc": "Returns an array of user mentions in the tweet.",
      "default": null
    },
    {
      "name": "MediaEntities",
      "type": [
        "null",
        {
          "type": "array",
          "items": {
            "type": "record",
            "name": "MediaEntity",
            "fields": [
              {
                "name": "Id",
                "type": [
                  "null",
                  {
                    "type": "long",
                    "connect.doc": "Returns the id of the media."
                  }
                ],
                "doc": "Returns the id of the media.",
                "default": null
              },
              {
                "name": "Type",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the media type photo, video, animated_gif."
                  }
                ],
                "doc": "Returns the media type photo, video, animated_gif.",
                "default": null
              },
              {
                "name": "MediaURL",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the media URL."
                  }
                ],
                "doc": "Returns the media URL.",
                "default": null
              },
              {
                "name": "Sizes",
                "type": {
                  "type": "array",
                  "items": {
                    "type": "record",
                    "name": "MapEntry",
                    "namespace": "io.confluent.connect.avro",
                    "fields": [
                      {
                        "name": "key",
                        "type": "int"
                      },
                      {
                        "name": "value",
                        "type": {
                          "type": "record",
                          "name": "Size",
                          "namespace": "com.github.jcustenborder.kafka.connect.twitter.MediaEntity",
                          "fields": [
                            {
                              "name": "Resize",
                              "type": [
                                "null",
                                {
                                  "type": "int",
                                  "connect.doc": ""
                                }
                              ],
                              "doc": "",
                              "default": null
                            },
                            {
                              "name": "Width",
                              "type": [
                                "null",
                                {
                                  "type": "int",
                                  "connect.doc": ""
                                }
                              ],
                              "doc": "",
                              "default": null
                            },
                            {
                              "name": "Height",
                              "type": [
                                "null",
                                {
                                  "type": "int",
                                  "connect.doc": ""
                                }
                              ],
                              "doc": "",
                              "default": null
                            }
                          ],
                          "connect.doc": "",
                          "connect.name": "com.github.jcustenborder.kafka.connect.twitter.MediaEntity.Size"
                        },
                        "doc": ""
                      }
                    ]
                  }
                }
              },
              {
                "name": "MediaURLHttps",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the media secure URL."
                  }
                ],
                "doc": "Returns the media secure URL.",
                "default": null
              },
              {
                "name": "VideoAspectRatioWidth",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": ""
                  }
                ],
                "doc": "",
                "default": null
              },
              {
                "name": "VideoAspectRatioHeight",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": ""
                  }
                ],
                "doc": "",
                "default": null
              },
              {
                "name": "VideoDurationMillis",
                "type": [
                  "null",
                  {
                    "type": "long",
                    "connect.doc": ""
                  }
                ],
                "doc": "",
                "default": null
              },
              {
                "name": "VideoVariants",
                "type": [
                  "null",
                  {
                    "type": "array",
                    "items": {
                      "type": "record",
                      "name": "Variant",
                      "namespace": "com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity",
                      "fields": [
                        {
                          "name": "Url",
                          "type": [
                            "null",
                            {
                              "type": "string",
                              "connect.doc": ""
                            }
                          ],
                          "doc": "",
                          "default": null
                        },
                        {
                          "name": "Bitrate",
                          "type": [
                            "null",
                            {
                              "type": "int",
                              "connect.doc": ""
                            }
                          ],
                          "doc": "",
                          "default": null
                        },
                        {
                          "name": "ContentType",
                          "type": [
                            "null",
                            {
                              "type": "string",
                              "connect.doc": ""
                            }
                          ],
                          "doc": "",
                          "default": null
                        }
                      ],
                      "connect.doc": "",
                      "connect.name": "com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity.Variant"
                    },
                    "connect.doc": "Returns size variations of the media."
                  }
                ],
                "doc": "Returns size variations of the media.",
                "default": null
              },
              {
                "name": "ExtAltText",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": ""
                  }
                ],
                "doc": "",
                "default": null
              },
              {
                "name": "URL",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the URL mentioned in the tweet."
                  }
                ],
                "doc": "Returns the URL mentioned in the tweet.",
                "default": null
              },
              {
                "name": "Text",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the URL mentioned in the tweet."
                  }
                ],
                "doc": "Returns the URL mentioned in the tweet.",
                "default": null
              },
              {
                "name": "ExpandedURL",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the expanded URL if mentioned URL is shorten."
                  }
                ],
                "doc": "Returns the expanded URL if mentioned URL is shorten.",
                "default": null
              },
              {
                "name": "Start",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": "Returns the index of the start character of the URL mentioned in the tweet."
                  }
                ],
                "doc": "Returns the index of the start character of the URL mentioned in the tweet.",
                "default": null
              },
              {
                "name": "End",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": "Returns the index of the end character of the URL mentioned in the tweet."
                  }
                ],
                "doc": "Returns the index of the end character of the URL mentioned in the tweet.",
                "default": null
              },
              {
                "name": "DisplayURL",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the display URL if mentioned URL is shorten."
                  }
                ],
                "doc": "Returns the display URL if mentioned URL is shorten.",
                "default": null
              }
            ],
            "connect.doc": "",
            "connect.name": "com.github.jcustenborder.kafka.connect.twitter.MediaEntity"
          },
          "connect.doc": "Returns an array of MediaEntities if medias are available in the tweet."
        }
      ],
      "doc": "Returns an array of MediaEntities if medias are available in the tweet.",
      "default": null
    },
    {
      "name": "SymbolEntities",
      "type": [
        "null",
        {
          "type": "array",
          "items": {
            "type": "record",
            "name": "SymbolEntity",
            "fields": [
              {
                "name": "Start",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": "Returns the index of the start character of the symbol."
                  }
                ],
                "doc": "Returns the index of the start character of the symbol.",
                "default": null
              },
              {
                "name": "End",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": "Returns the index of the end character of the symbol."
                  }
                ],
                "doc": "Returns the index of the end character of the symbol.",
                "default": null
              },
              {
                "name": "Text",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the text of the entity"
                  }
                ],
                "doc": "Returns the text of the entity",
                "default": null
              }
            ],
            "connect.doc": "",
            "connect.name": "com.github.jcustenborder.kafka.connect.twitter.SymbolEntity"
          },
          "connect.doc": "Returns an array of SymbolEntities if medias are available in the tweet."
        }
      ],
      "doc": "Returns an array of SymbolEntities if medias are available in the tweet.",
      "default": null
    },
    {
      "name": "URLEntities",
      "type": [
        "null",
        {
          "type": "array",
          "items": {
            "type": "record",
            "name": "URLEntity",
            "fields": [
              {
                "name": "URL",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the URL mentioned in the tweet."
                  }
                ],
                "doc": "Returns the URL mentioned in the tweet.",
                "default": null
              },
              {
                "name": "Text",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the URL mentioned in the tweet."
                  }
                ],
                "doc": "Returns the URL mentioned in the tweet.",
                "default": null
              },
              {
                "name": "ExpandedURL",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the expanded URL if mentioned URL is shorten."
                  }
                ],
                "doc": "Returns the expanded URL if mentioned URL is shorten.",
                "default": null
              },
              {
                "name": "Start",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": "Returns the index of the start character of the URL mentioned in the tweet."
                  }
                ],
                "doc": "Returns the index of the start character of the URL mentioned in the tweet.",
                "default": null
              },
              {
                "name": "End",
                "type": [
                  "null",
                  {
                    "type": "int",
                    "connect.doc": "Returns the index of the end character of the URL mentioned in the tweet."
                  }
                ],
                "doc": "Returns the index of the end character of the URL mentioned in the tweet.",
                "default": null
              },
              {
                "name": "DisplayURL",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "connect.doc": "Returns the display URL if mentioned URL is shorten."
                  }
                ],
                "doc": "Returns the display URL if mentioned URL is shorten.",
                "default": null
              }
            ],
            "connect.doc": "",
            "connect.name": "com.github.jcustenborder.kafka.connect.twitter.URLEntity"
          },
          "connect.doc": "Returns an array if URLEntity mentioned in the tweet."
        }
      ],
      "doc": "Returns an array if URLEntity mentioned in the tweet.",
      "default": null
    }
  ],
  "connect.doc": "Twitter status message.",
  "connect.name": "com.github.jcustenborder.kafka.connect.twitter.Status"
}
rmoff commented 6 years ago

I've tried this too with a STRING key ("key.converter": "org.apache.kafka.connect.storage.StringConverter"), and get the same error from KSQL

{
  "name": "twitter_source_avro_02",
  "config": {
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
    "kafka.delete.topic": "twitter_deletes_avro_02",
    "twitter.oauth.consumerKey": "XXXX",
    "twitter.oauth.consumerSecret": "XXXX",
    "twitter.oauth.accessToken": "XXXX",
    "twitter.oauth.accessTokenSecret": "XXXX",
    "kafka.status.topic": "twitter_avro_02",
    "connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
    "process.deletes": true,
    "filter.keywords": "rickastley,rmoff,ksql,confluent,jaykreps,gwenshap,apachekafka,nehanarkhede,kafka streams,kafka connect,kafkasummit,kafka,bacon,aws,ilkley"
  }
}

Sample message (note STRING key):

ksql> print 'twitter_avro_02' from beginning;
Format:AVRO
06/07/18 10:19:46 BST, Struct{Id=1015163384315240448}, {"CreatedAt": 1530868786000, "Id": 1015163384315240448, "Text": "Will be doing lamb burgers with feta and tzaziki\nCrumbed chicken burgers with Asian pickles \nAnd will revive the beef burger with the bacon, blue cheese, bourbon poached pears", "Source": "<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>", "Truncated": true, "InReplyToStatusId": 1015163372000759808, "InReplyToUserId": 2586931947, "InReplyToScreenName": "cheftakura", "GeoLocation": null, "Place": null, "Favorited": false, "Retweeted": false, "FavoriteCount": 0, "User": {"Id": 2586931947, "Name": "Hotelier ��", "ScreenName": "cheftakura", "Location": "Bvumba and Mutare ", "Description": "Wine, food, cricket, economics, music. Probably in that order", "ContributorsEnabled": false, "ProfileImageURL": "http://pbs.twimg.com/profile_images/977536986041987073/71-SAVTO_normal.jpg", "BiggerProfileImageURL": "http://pbs.twimg.com/profile_images/977536986041987073/71-SAVTO_bigger.jpg", "MiniProfileImageURL": "http://pbs.twimg.com/profile_images/977536986041987073/71-SAVTO_mini.jpg", "OriginalProfileImageURL": "http://pbs.twimg.com/profile_images/977536986041987073/71-SAVTO.jpg", "ProfileImageURLHttps": "https://pbs.twimg.com/profile_images/977536986041987073/71-SAVTO_normal.jpg", "BiggerProfileImageURLHttps": "https://pbs.twimg.com/profile_images/977536986041987073/71-SAVTO_bigger.jpg", "MiniProfileImageURLHttps": "https://pbs.twimg.com/profile_images/977536986041987073/71-SAVTO_mini.jpg", "OriginalProfileImageURLHttps": "https://pbs.twimg.com/profile_images/977536986041987073/71-SAVTO.jpg", "DefaultProfileImage": false, "URL": null, "Protected": false, "FollowersCount": 1990, "ProfileBackgroundColor": "8B542B", "ProfileTextColor": "333333", "ProfileLinkColor": "9D582E", "ProfileSidebarFillColor": "EADEAA", "ProfileSidebarBorderColor": "D9B17E", "ProfileUseBackgroundImage": true, "DefaultProfile": false, "ShowAllInlineMedia": false, "FriendsCount": 1503, "CreatedAt": 1402058406000, "FavouritesCount": 57899, "UtcOffset": -1, "TimeZone": null, "ProfileBackgroundImageURL": "http://abs.twimg.com/images/themes/theme8/bg.gif", "ProfileBackgroundImageUrlHttps": "https://abs.twimg.com/images/themes/theme8/bg.gif", "ProfileBannerURL": "https://pbs.twimg.com/profile_banners/2586931947/1493054747/web", "ProfileBannerRetinaURL": "https://pbs.twimg.com/profile_banners/2586931947/1493054747/web_retina", "ProfileBannerIPadURL": "https://pbs.twimg.com/profile_banners/2586931947/1493054747/ipad", "ProfileBannerIPadRetinaURL": "https://pbs.twimg.com/profile_banners/2586931947/1493054747/ipad_retina", "ProfileBannerMobileURL": "https://pbs.twimg.com/profile_banners/2586931947/1493054747/mobile", "ProfileBannerMobileRetinaURL": "https://pbs.twimg.com/profile_banners/2586931947/1493054747/mobile_retina", "ProfileBackgroundTiled": false, "Lang": "en", "StatusesCount": 13593, "GeoEnabled": false, "Verified": false, "Translator": false, "ListedCount": 18, "FollowRequestSent": false, "WithheldInCountries": []}, "Retweet": false, "Contributors": [], "RetweetCount": 0, "RetweetedByMe": false, "CurrentUserRetweetId": -1, "PossiblySensitive": false, "Lang": "en", "WithheldInCountries": [], "HashtagEntities": [], "UserMentionEntities": [], "MediaEntities": [], "SymbolEntities": [], "URLEntities": []}

STREAM still fails to create:

ksql> create stream twitter with (kafka_topic='twitter_avro_02',value_format='avro');
 Unable to verify the AVRO schema is compatible with KSQL. Map key must be of type STRING
ksql>
rmoff commented 6 years ago

Internal note from @rodesai:

this is complaining because the schema has a map with non-string keys currently KSQL only handles strings as map keys once the schema inference test PR gets merged this should start working (edited)

rmoff commented 6 years ago

@rodesai when's the schema inference test PR due?

To clarify, I'm trying to get this blog—which currently uses JSON only and VARCHAR/EXTRACTJSONFIELD to navigate the schema—to use Avro natively instead.

rmoff commented 6 years ago

New error, with latest build (rc4):

ksql> create stream tweets with (kafka_topic='twitter_avro',value_format='avro');
io.confluent.ksql.parser.exception.ParseFailedException: line 2:1927: extraneous input 'END' expecting {'ADD', 'APPROXIMATE', 'AT', 'CONFIDENCE', 'NO', 'SUBSTRING', 'POSITION', 'TINYINT', 'SMALLINT', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'STRUCT', 'VIEW', 'REPLACE', 'GRANT', 'REVOKE', 'PRIVILEGES', 'PUBLIC', 'OPTION', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'GRAPHVIZ', 'LOGICAL', 'DISTRIBUTED', 'TRY', 'SHOW', 'TABLES', 'SCHEMAS', 'CATALOGS', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'TO', 'SYSTEM', 'BERNOULLI', 'POISSONIZED', 'TABLESAMPLE', 'RESCALED', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'WORK', 'ISOLATION', 'LEVEL', 'SERIALIZABLE', 'REPEATABLE', 'COMMITTED', 'UNCOMMITTED', 'READ', 'WRITE', 'ONLY', 'CALL', 'NFD', 'NFC', 'NFKD', 'NFKC', 'IF', 'NULLIF', 'COALESCE', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}

Sample message:

{"CreatedAt": 1532985026000, "Id": 1024039540556742656, "Text": "RT @awscloud: Have you scheduled your AWS Certified Cloud Practitioner exam? Validate your skills in cloud fundamentals with an industry-re\u2026", "Source": "<a href=\"http://gaggleamp.com/twit/\" rel=\"nofollow\">GaggleAMP</a>", "Truncated": false, "InReplyToStatusId": -1, "InReplyToUserId": -1, "InReplyToScreenName": null, "GeoLocation": null, "Place": null, "Favorited": false, "Retweeted": false, "FavoriteCount": 0, "User": {"Id": 3069390827, "Name": "Stefan Letz", "ScreenName": "stefletz", "Location": "Seattle, WA", "Description": "I work for @AWSCloud and my opinions are my own.", "ContributorsEnabled": false, "ProfileImageURL": "http://pbs.twimg.com/profile_images/923407091561140224/JyMBenHR_normal.jpg", "BiggerProfileImageURL": "http://pbs.twimg.com/profile_images/923407091561140224/JyMBenHR_bigger.jpg", "MiniProfileImageURL": "http://pbs.twimg.com/profile_images/923407091561140224/JyMBenHR_mini.jpg", "OriginalProfileImageURL": "http://pbs.twimg.com/profile_images/923407091561140224/JyMBenHR.jpg", "ProfileImageURLHttps": "https://pbs.twimg.com/profile_images/923407091561140224/JyMBenHR_normal.jpg", "BiggerProfileImageURLHttps": "https://pbs.twimg.com/profile_images/923407091561140224/JyMBenHR_bigger.jpg", "MiniProfileImageURLHttps": "https://pbs.twimg.com/profile_images/923407091561140224/JyMBenHR_mini.jpg", "OriginalProfileImageURLHttps": "https://pbs.twimg.com/profile_images/923407091561140224/JyMBenHR.jpg", "DefaultProfileImage": false, "URL": null, "Protected": false, "FollowersCount": 32, "ProfileBackgroundColor": "C0DEED", "ProfileTextColor": "333333", "ProfileLinkColor": "1DA1F2", "ProfileSidebarFillColor": "DDEEF6", "ProfileSidebarBorderColor": "C0DEED", "ProfileUseBackgroundImage": true, "DefaultProfile": true, "ShowAllInlineMedia": false, "FriendsCount": 61, "CreatedAt": 1425453770000, "FavouritesCount": 624, "UtcOffset": -1, "TimeZone": null, "ProfileBackgroundImageURL": "http://abs.twimg.com/images/themes/theme1/bg.png", "ProfileBackgroundImageUrlHttps": "https://abs.twimg.com/images/themes/theme1/bg.png", "ProfileBannerURL": null, "ProfileBannerRetinaURL": null, "ProfileBannerIPadURL": null, "ProfileBannerIPadRetinaURL": null, "ProfileBannerMobileURL": null, "ProfileBannerMobileRetinaURL": null, "ProfileBackgroundTiled": false, "Lang": "en", "StatusesCount": 986, "GeoEnabled": false, "Verified": false, "Translator": false, "ListedCount": 4, "FollowRequestSent": false, "WithheldInCountries": []}, "Retweet": true, "Contributors": [], "RetweetCount": 0, "RetweetedByMe": false, "CurrentUserRetweetId": -1, "PossiblySensitive": false, "Lang": "en", "WithheldInCountries": [], "HashtagEntities": [], "UserMentionEntities": [{"Name": "Amazon Web Services", "Id": 66780587, "Text": "awscloud", "ScreenName": "awscloud", "Start": 3, "End": 12}], "MediaEntities": [], "SymbolEntities": [], "URLEntities": []}

Schema

{"type":"record","name":"Status","namespace":"com.github.jcustenborder.kafka.connect.twitter","fields":[{"name":"CreatedAt","type":["null",{"type":"long","connect.doc":"Return the created_at","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"doc":"Return the created_at","default":null},{"name":"Id","type":["null",{"type":"long","connect.doc":"Returns the id of the status"}],"doc":"Returns the id of the status","default":null},{"name":"Text","type":["null",{"type":"string","connect.doc":"Returns the text of the status"}],"doc":"Returns the text of the status","default":null},{"name":"Source","type":["null",{"type":"string","connect.doc":"Returns the source"}],"doc":"Returns the source","default":null},{"name":"Truncated","type":["null",{"type":"boolean","connect.doc":"Test if the status is truncated"}],"doc":"Test if the status is truncated","default":null},{"name":"InReplyToStatusId","type":["null",{"type":"long","connect.doc":"Returns the in_reply_tostatus_id"}],"doc":"Returns the in_reply_tostatus_id","default":null},{"name":"InReplyToUserId","type":["null",{"type":"long","connect.doc":"Returns the in_reply_user_id"}],"doc":"Returns the in_reply_user_id","default":null},{"name":"InReplyToScreenName","type":["null",{"type":"string","connect.doc":"Returns the in_reply_to_screen_name"}],"doc":"Returns the in_reply_to_screen_name","default":null},{"name":"GeoLocation","type":["null",{"type":"record","name":"GeoLocation","fields":[{"name":"Latitude","type":{"type":"double","connect.doc":"returns the latitude of the geo location"},"doc":"returns the latitude of the geo location"},{"name":"Longitude","type":{"type":"double","connect.doc":"returns the longitude of the geo location"},"doc":"returns the longitude of the geo location"}],"connect.doc":"Returns The location that this tweet refers to if available.","connect.name":"com.github.jcustenborder.kafka.connect.twitter.GeoLocation"}],"doc":"Returns The location that this tweet refers to if available.","default":null},{"name":"Place","type":["null",{"type":"record","name":"Place","fields":[{"name":"Name","type":["null","string"],"default":null},{"name":"StreetAddress","type":["null","string"],"default":null},{"name":"CountryCode","type":["null","string"],"default":null},{"name":"Id","type":["null","string"],"default":null},{"name":"Country","type":["null","string"],"default":null},{"name":"PlaceType","type":["null","string"],"default":null},{"name":"URL","type":["null","string"],"default":null},{"name":"FullName","type":["null","string"],"default":null}],"connect.doc":"Returns the place attached to this status","connect.name":"com.github.jcustenborder.kafka.connect.twitter.Place"}],"doc":"Returns the place attached to this status","default":null},{"name":"Favorited","type":["null",{"type":"boolean","connect.doc":"Test if the status is favorited"}],"doc":"Test if the status is favorited","default":null},{"name":"Retweeted","type":["null",{"type":"boolean","connect.doc":"Test if the status is retweeted"}],"doc":"Test if the status is retweeted","default":null},{"name":"FavoriteCount","type":["null",{"type":"int","connect.doc":"Indicates approximately how many times this Tweet has been \"favorited\" by Twitter users."}],"doc":"Indicates approximately how many times this Tweet has been \"favorited\" by Twitter users.","default":null},{"name":"User","type":{"type":"record","name":"User","fields":[{"name":"Id","type":["null",{"type":"long","connect.doc":"Returns the id of the user"}],"doc":"Returns the id of the user","default":null},{"name":"Name","type":["null",{"type":"string","connect.doc":"Returns the name of the user"}],"doc":"Returns the name of the user","default":null},{"name":"ScreenName","type":["null",{"type":"string","connect.doc":"Returns the screen name of the user"}],"doc":"Returns the screen name of the user","default":null},{"name":"Location","type":["null",{"type":"string","connect.doc":"Returns the location of the user"}],"doc":"Returns the location of the user","default":null},{"name":"Description","type":["null",{"type":"string","connect.doc":"Returns the description of the user"}],"doc":"Returns the description of the user","default":null},{"name":"ContributorsEnabled","type":["null",{"type":"boolean","connect.doc":"Tests if the user is enabling contributors"}],"doc":"Tests if the user is enabling contributors","default":null},{"name":"ProfileImageURL","type":["null",{"type":"string","connect.doc":"Returns the profile image url of the user"}],"doc":"Returns the profile image url of the user","default":null},{"name":"BiggerProfileImageURL","type":["null","string"],"default":null},{"name":"MiniProfileImageURL","type":["null","string"],"default":null},{"name":"OriginalProfileImageURL","type":["null","string"],"default":null},{"name":"ProfileImageURLHttps","type":["null","string"],"default":null},{"name":"BiggerProfileImageURLHttps","type":["null","string"],"default":null},{"name":"MiniProfileImageURLHttps","type":["null","string"],"default":null},{"name":"OriginalProfileImageURLHttps","type":["null","string"],"default":null},{"name":"DefaultProfileImage","type":["null",{"type":"boolean","connect.doc":"Tests if the user has not uploaded their own avatar"}],"doc":"Tests if the user has not uploaded their own avatar","default":null},{"name":"URL","type":["null",{"type":"string","connect.doc":"Returns the url of the user"}],"doc":"Returns the url of the user","default":null},{"name":"Protected","type":["null",{"type":"boolean","connect.doc":"Test if the user status is protected"}],"doc":"Test if the user status is protected","default":null},{"name":"FollowersCount","type":["null",{"type":"int","connect.doc":"Returns the number of followers"}],"doc":"Returns the number of followers","default":null},{"name":"ProfileBackgroundColor","type":["null","string"],"default":null},{"name":"ProfileTextColor","type":["null","string"],"default":null},{"name":"ProfileLinkColor","type":["null","string"],"default":null},{"name":"ProfileSidebarFillColor","type":["null","string"],"default":null},{"name":"ProfileSidebarBorderColor","type":["null","string"],"default":null},{"name":"ProfileUseBackgroundImage","type":["null","boolean"],"default":null},{"name":"DefaultProfile","type":["null",{"type":"boolean","connect.doc":"Tests if the user has not altered the theme or background"}],"doc":"Tests if the user has not altered the theme or background","default":null},{"name":"ShowAllInlineMedia","type":["null","boolean"],"default":null},{"name":"FriendsCount","type":["null",{"type":"int","connect.doc":"Returns the number of users the user follows (AKA \"followings\")"}],"doc":"Returns the number of users the user follows (AKA \"followings\")","default":null},{"name":"CreatedAt","type":["null",{"type":"long","connect.version":1,"connect.name":"org.apache.kafka.connect.data.Timestamp","logicalType":"timestamp-millis"}],"default":null},{"name":"FavouritesCount","type":["null","int"],"default":null},{"name":"UtcOffset","type":["null","int"],"default":null},{"name":"TimeZone","type":["null","string"],"default":null},{"name":"ProfileBackgroundImageURL","type":["null","string"],"default":null},{"name":"ProfileBackgroundImageUrlHttps","type":["null","string"],"default":null},{"name":"ProfileBannerURL","type":["null","string"],"default":null},{"name":"ProfileBannerRetinaURL","type":["null","string"],"default":null},{"name":"ProfileBannerIPadURL","type":["null","string"],"default":null},{"name":"ProfileBannerIPadRetinaURL","type":["null","string"],"default":null},{"name":"ProfileBannerMobileURL","type":["null","string"],"default":null},{"name":"ProfileBannerMobileRetinaURL","type":["null","string"],"default":null},{"name":"ProfileBackgroundTiled","type":["null","boolean"],"default":null},{"name":"Lang","type":["null",{"type":"string","connect.doc":"Returns the preferred language of the user"}],"doc":"Returns the preferred language of the user","default":null},{"name":"StatusesCount","type":["null","int"],"default":null},{"name":"GeoEnabled","type":["null","boolean"],"default":null},{"name":"Verified","type":["null","boolean"],"default":null},{"name":"Translator","type":["null","boolean"],"default":null},{"name":"ListedCount","type":["null",{"type":"int","connect.doc":"Returns the number of public lists the user is listed on, or -1 if the count is unavailable."}],"doc":"Returns the number of public lists the user is listed on, or -1 if the count is unavailable.","default":null},{"name":"FollowRequestSent","type":["null",{"type":"boolean","connect.doc":"Returns true if the authenticating user has requested to follow this user, otherwise false."}],"doc":"Returns true if the authenticating user has requested to follow this user, otherwise false.","default":null},{"name":"WithheldInCountries","type":{"type":"array","items":"string","connect.doc":"Returns the list of country codes where the user is withheld"},"doc":"Returns the list of country codes where the user is withheld"}],"connect.doc":"Return the user associated with the status. This can be null if the instance is from User.getStatus().","connect.name":"com.github.jcustenborder.kafka.connect.twitter.User"},"doc":"Return the user associated with the status. This can be null if the instance is from User.getStatus()."},{"name":"Retweet","type":["null","boolean"],"default":null},{"name":"Contributors","type":{"type":"array","items":"long","connect.doc":"Returns an array of contributors, or null if no contributor is associated with this status."},"doc":"Returns an array of contributors, or null if no contributor is associated with this status."},{"name":"RetweetCount","type":["null",{"type":"int","connect.doc":"Returns the number of times this tweet has been retweeted, or -1 when the tweet was created before this feature was enabled."}],"doc":"Returns the number of times this tweet has been retweeted, or -1 when the tweet was created before this feature was enabled.","default":null},{"name":"RetweetedByMe","type":["null","boolean"],"default":null},{"name":"CurrentUserRetweetId","type":["null",{"type":"long","connect.doc":"Returns the authenticating user's retweet's id of this tweet, or -1L when the tweet was created before this feature was enabled."}],"doc":"Returns the authenticating user's retweet's id of this tweet, or -1L when the tweet was created before this feature was enabled.","default":null},{"name":"PossiblySensitive","type":["null","boolean"],"default":null},{"name":"Lang","type":["null",{"type":"string","connect.doc":"Returns the lang of the status text if available."}],"doc":"Returns the lang of the status text if available.","default":null},{"name":"WithheldInCountries","type":{"type":"array","items":"string","connect.doc":"Returns the list of country codes where the tweet is withheld"},"doc":"Returns the list of country codes where the tweet is withheld"},{"name":"HashtagEntities","type":["null",{"type":"array","items":{"type":"record","name":"HashtagEntity","fields":[{"name":"Text","type":["null",{"type":"string","connect.doc":"Returns the text of the hashtag without #."}],"doc":"Returns the text of the hashtag without #.","default":null},{"name":"Start","type":["null",{"type":"int","connect.doc":"Returns the index of the start character of the hashtag."}],"doc":"Returns the index of the start character of the hashtag.","default":null},{"name":"End","type":["null",{"type":"int","connect.doc":"Returns the index of the end character of the hashtag."}],"doc":"Returns the index of the end character of the hashtag.","default":null}],"connect.doc":"","connect.name":"com.github.jcustenborder.kafka.connect.twitter.HashtagEntity"},"connect.doc":"Returns an array if hashtag mentioned in the tweet."}],"doc":"Returns an array if hashtag mentioned in the tweet.","default":null},{"name":"UserMentionEntities","type":["null",{"type":"array","items":{"type":"record","name":"UserMentionEntity","fields":[{"name":"Name","type":["null",{"type":"string","connect.doc":"Returns the name mentioned in the status."}],"doc":"Returns the name mentioned in the status.","default":null},{"name":"Id","type":["null",{"type":"long","connect.doc":"Returns the user id mentioned in the status."}],"doc":"Returns the user id mentioned in the status.","default":null},{"name":"Text","type":["null",{"type":"string","connect.doc":"Returns the screen name mentioned in the status."}],"doc":"Returns the screen name mentioned in the status.","default":null},{"name":"ScreenName","type":["null",{"type":"string","connect.doc":"Returns the screen name mentioned in the status."}],"doc":"Returns the screen name mentioned in the status.","default":null},{"name":"Start","type":["null",{"type":"int","connect.doc":"Returns the index of the start character of the user mention."}],"doc":"Returns the index of the start character of the user mention.","default":null},{"name":"End","type":["null",{"type":"int","connect.doc":"Returns the index of the end character of the user mention."}],"doc":"Returns the index of the end character of the user mention.","default":null}],"connect.doc":"","connect.name":"com.github.jcustenborder.kafka.connect.twitter.UserMentionEntity"},"connect.doc":"Returns an array of user mentions in the tweet."}],"doc":"Returns an array of user mentions in the tweet.","default":null},{"name":"MediaEntities","type":["null",{"type":"array","items":{"type":"record","name":"MediaEntity","fields":[{"name":"Id","type":["null",{"type":"long","connect.doc":"Returns the id of the media."}],"doc":"Returns the id of the media.","default":null},{"name":"Type","type":["null",{"type":"string","connect.doc":"Returns the media type photo, video, animated_gif."}],"doc":"Returns the media type photo, video, animated_gif.","default":null},{"name":"MediaURL","type":["null",{"type":"string","connect.doc":"Returns the media URL."}],"doc":"Returns the media URL.","default":null},{"name":"Sizes","type":{"type":"array","items":{"type":"record","name":"MapEntry","namespace":"io.confluent.connect.avro","fields":[{"name":"key","type":"int"},{"name":"value","type":{"type":"record","name":"Size","namespace":"com.github.jcustenborder.kafka.connect.twitter.MediaEntity","fields":[{"name":"Resize","type":["null",{"type":"int","connect.doc":""}],"doc":"","default":null},{"name":"Width","type":["null",{"type":"int","connect.doc":""}],"doc":"","default":null},{"name":"Height","type":["null",{"type":"int","connect.doc":""}],"doc":"","default":null}],"connect.doc":"","connect.name":"com.github.jcustenborder.kafka.connect.twitter.MediaEntity.Size"},"doc":""}]}}},{"name":"MediaURLHttps","type":["null",{"type":"string","connect.doc":"Returns the media secure URL."}],"doc":"Returns the media secure URL.","default":null},{"name":"VideoAspectRatioWidth","type":["null",{"type":"int","connect.doc":""}],"doc":"","default":null},{"name":"VideoAspectRatioHeight","type":["null",{"type":"int","connect.doc":""}],"doc":"","default":null},{"name":"VideoDurationMillis","type":["null",{"type":"long","connect.doc":""}],"doc":"","default":null},{"name":"VideoVariants","type":["null",{"type":"array","items":{"type":"record","name":"Variant","namespace":"com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity","fields":[{"name":"Url","type":["null",{"type":"string","connect.doc":""}],"doc":"","default":null},{"name":"Bitrate","type":["null",{"type":"int","connect.doc":""}],"doc":"","default":null},{"name":"ContentType","type":["null",{"type":"string","connect.doc":""}],"doc":"","default":null}],"connect.doc":"","connect.name":"com.github.jcustenborder.kafka.connect.twitter.ExtendedMediaEntity.Variant"},"connect.doc":"Returns size variations of the media."}],"doc":"Returns size variations of the media.","default":null},{"name":"ExtAltText","type":["null",{"type":"string","connect.doc":""}],"doc":"","default":null},{"name":"URL","type":["null",{"type":"string","connect.doc":"Returns the URL mentioned in the tweet."}],"doc":"Returns the URL mentioned in the tweet.","default":null},{"name":"Text","type":["null",{"type":"string","connect.doc":"Returns the URL mentioned in the tweet."}],"doc":"Returns the URL mentioned in the tweet.","default":null},{"name":"ExpandedURL","type":["null",{"type":"string","connect.doc":"Returns the expanded URL if mentioned URL is shorten."}],"doc":"Returns the expanded URL if mentioned URL is shorten.","default":null},{"name":"Start","type":["null",{"type":"int","connect.doc":"Returns the index of the start character of the URL mentioned in the tweet."}],"doc":"Returns the index of the start character of the URL mentioned in the tweet.","default":null},{"name":"End","type":["null",{"type":"int","connect.doc":"Returns the index of the end character of the URL mentioned in the tweet."}],"doc":"Returns the index of the end character of the URL mentioned in the tweet.","default":null},{"name":"DisplayURL","type":["null",{"type":"string","connect.doc":"Returns the display URL if mentioned URL is shorten."}],"doc":"Returns the display URL if mentioned URL is shorten.","default":null}],"connect.doc":"","connect.name":"com.github.jcustenborder.kafka.connect.twitter.MediaEntity"},"connect.doc":"Returns an array of MediaEntities if medias are available in the tweet."}],"doc":"Returns an array of MediaEntities if medias are available in the tweet.","default":null},{"name":"SymbolEntities","type":["null",{"type":"array","items":{"type":"record","name":"SymbolEntity","fields":[{"name":"Start","type":["null",{"type":"int","connect.doc":"Returns the index of the start character of the symbol."}],"doc":"Returns the index of the start character of the symbol.","default":null},{"name":"End","type":["null",{"type":"int","connect.doc":"Returns the index of the end character of the symbol."}],"doc":"Returns the index of the end character of the symbol.","default":null},{"name":"Text","type":["null",{"type":"string","connect.doc":"Returns the text of the entity"}],"doc":"Returns the text of the entity","default":null}],"connect.doc":"","connect.name":"com.github.jcustenborder.kafka.connect.twitter.SymbolEntity"},"connect.doc":"Returns an array of SymbolEntities if medias are available in the tweet."}],"doc":"Returns an array of SymbolEntities if medias are available in the tweet.","default":null},{"name":"URLEntities","type":["null",{"type":"array","items":{"type":"record","name":"URLEntity","fields":[{"name":"URL","type":["null",{"type":"string","connect.doc":"Returns the URL mentioned in the tweet."}],"doc":"Returns the URL mentioned in the tweet.","default":null},{"name":"Text","type":["null",{"type":"string","connect.doc":"Returns the URL mentioned in the tweet."}],"doc":"Returns the URL mentioned in the tweet.","default":null},{"name":"ExpandedURL","type":["null",{"type":"string","connect.doc":"Returns the expanded URL if mentioned URL is shorten."}],"doc":"Returns the expanded URL if mentioned URL is shorten.","default":null},{"name":"Start","type":["null",{"type":"int","connect.doc":"Returns the index of the start character of the URL mentioned in the tweet."}],"doc":"Returns the index of the start character of the URL mentioned in the tweet.","default":null},{"name":"End","type":["null",{"type":"int","connect.doc":"Returns the index of the end character of the URL mentioned in the tweet."}],"doc":"Returns the index of the end character of the URL mentioned in the tweet.","default":null},{"name":"DisplayURL","type":["null",{"type":"string","connect.doc":"Returns the display URL if mentioned URL is shorten."}],"doc":"Returns the display URL if mentioned URL is shorten.","default":null}],"connect.doc":"","connect.name":"com.github.jcustenborder.kafka.connect.twitter.URLEntity"},"connect.doc":"Returns an array if URLEntity mentioned in the tweet."}],"doc":"Returns an array if URLEntity mentioned in the tweet.","default":null}],"connect.doc":"Twitter status message.","connect.name":"com.github.jcustenborder.kafka.connect.twitter.Status"}
rmoff commented 6 years ago

Is it the END column tripping things up here?

rodesai commented 6 years ago

Yeah it looks like the parser doesn't like it when a column has that name:

ksql> create stream foo (end bigint) with (kafka_topic='users', value_format='json');
line 1:20: extraneous input 'end' expecting {'ADD', 'APPROXIMATE', 'AT', 'CONFIDENCE', 'NO', 'SUBSTRING', 'POSITION', 'TINYINT', 'SMALLINT', 'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'STRUCT', 'VIEW', 'REPLACE', 'GRANT', 'REVOKE', 'PRIVILEGES', 'PUBLIC', 'OPTION', 'EXPLAIN', 'ANALYZE', 'FORMAT', 'TYPE', 'TEXT', 'GRAPHVIZ', 'LOGICAL', 'DISTRIBUTED', 'TRY', 'SHOW', 'TABLES', 'SCHEMAS', 'CATALOGS', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'TO', 'SYSTEM', 'BERNOULLI', 'POISSONIZED', 'TABLESAMPLE', 'RESCALED', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'WORK', 'ISOLATION', 'LEVEL', 'SERIALIZABLE', 'REPEATABLE', 'COMMITTED', 'UNCOMMITTED', 'READ', 'WRITE', 'ONLY', 'CALL', 'NFD', 'NFC', 'NFKD', 'NFKC', 'IF', 'NULLIF', 'COALESCE', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
allenansari174 commented 5 years ago

I was trying to do step by step what you did in the blog, but when trying to read data from KSQL all fields are null(no name, no id, no text) any idea what I need to do. @rmoff thanks