aws-amplify / amplify-js

A declarative JavaScript library for application development using cloud services.
https://docs.amplify.aws/lib/q/platform/js
Apache License 2.0
9.42k stars 2.12k forks source link

dynamo db to opensearch streaming lambda times out #11989

Closed mithun35h closed 12 months ago

mithun35h commented 1 year ago

Before opening, please confirm:

JavaScript Framework

Not applicable

Amplify APIs

DataStore

Amplify Categories

api

Environment information

``` # Put output below this line System: OS: macOS 13.5.1 CPU: (8) x64 Apple M1 Pro Memory: 18.17 MB / 16.00 GB Shell: 5.9 - /bin/zsh Binaries: Node: 18.16.0 - ~/.volta/tools/image/node/18.16.0/bin/node Yarn: 1.22.19 - ~/Projects/lifedata/halo-prod/halo/node_modules/.bin/yarn npm: 6.14.11 - ~/.volta/tools/image/npm/6.14.11/bin/npm Browsers: Chrome: 116.0.5845.179 Safari: 16.6 npmPackages: <%= name %>: <%= version %> @apollo/client: ^3.7.17 => 3.7.17 @apollo/client/cache: undefined () @apollo/client/core: undefined () @apollo/client/errors: undefined () @apollo/client/link/batch: undefined () @apollo/client/link/batch-http: undefined () @apollo/client/link/context: undefined () @apollo/client/link/core: undefined () @apollo/client/link/error: undefined () @apollo/client/link/http: undefined () @apollo/client/link/persisted-queries: undefined () @apollo/client/link/retry: undefined () @apollo/client/link/schema: undefined () @apollo/client/link/subscriptions: undefined () @apollo/client/link/utils: undefined () @apollo/client/link/ws: undefined () @apollo/client/react: undefined () @apollo/client/react/components: undefined () @apollo/client/react/context: undefined () @apollo/client/react/hoc: undefined () @apollo/client/react/hooks: undefined () @apollo/client/react/parser: undefined () @apollo/client/react/ssr: undefined () @apollo/client/testing: undefined () @apollo/client/testing/core: undefined () @apollo/client/utilities: undefined () @apollo/client/utilities/globals: undefined () @aws-amplify/cli: 12.1.1 => 12.1.1 @aws-amplify/cli-extensibility-helper: ^3.0.1 => 3.0.9 @aws-crypto/client-node: ^3.2.0 => 3.2.2 @aws-lambda-powertools/logger: ^1.8.0 => 1.11.0 @aws-lambda-powertools/metrics: ^1.8.0 => 1.11.0 @aws-lambda-powertools/tracer: ^1.8.0 => 1.11.0 @aws-sdk/client-apigatewaymanagementapi: ^3.299.0 => 3.363.0 @aws-sdk/client-cognito-identity-provider: ^3.299.0 => 3.363.0 @aws-sdk/client-comprehend: ^3.299.0 => 3.363.0 (3.6.1) @aws-sdk/client-dynamodb: ^3.299.0 => 3.363.0 @aws-sdk/client-dynamodb-streams: ^3.299.0 => 3.363.0 @aws-sdk/client-lambda: ^3.325.0 => 3.363.0 @aws-sdk/client-lex-models-v2: ^3.299.0 => 3.363.0 @aws-sdk/client-lex-runtime-v2: ^3.299.0 => 3.363.0 (3.186.3) @aws-sdk/client-s3: ^3.378.0 => 3.378.0 (3.6.4) @aws-sdk/client-sfn: ^3.363.0 => 3.363.0 @aws-sdk/client-sqs: ^3.299.0 => 3.363.0 @aws-sdk/client-ssm: ^3.353.0 => 3.363.0 @aws-sdk/credential-providers: ^3.299.0 => 3.363.0 @aws-sdk/is-array-buffer: ^3.295.0 => 3.310.0 (3.6.1, 3.186.0) @aws-sdk/lib-dynamodb: ^3.299.0 => 3.365.0 @aws-sdk/s3-request-presigner: ^3.299.0 => 3.367.0 (3.6.1) @aws-sdk/util-dynamodb: ^3.299.0 => 3.365.0 @azure/storage-blob: ^12.12.0 => 12.14.0 @babel/core: 7.12.13 => 7.12.13 (7.22.8, 7.21.8, 7.21.4) @babel/preset-react: ^7.14.5 => 7.22.5 @babel/preset-typescript: 7.12.13 => 7.12.13 (7.22.5, 7.21.5) @clerk/clerk-react: ^4.21.0 => 4.22.0 @clerk/clerk-sdk-node: ^4.10.13 => 4.10.15 @clerk/themes: ^1.7.5 => 1.7.5 @commitlint/cli: ^15.0.0 => 15.0.0 @commitlint/config-conventional: ^15.0.0 => 15.0.0 @cypress/angular: 0.0.0-development @cypress/mount-utils: 0.0.0-development @cypress/react: 0.0.0-development @cypress/react18: 0.0.0-development @cypress/svelte: 0.0.0-development @cypress/vue: 0.0.0-development @cypress/vue2: 0.0.0-development @elastic/datemath: ^5.0.3 => 5.0.3 @elastic/eui: ^66.0.0 => 66.0.0 @emoji-mart/data: ^1.1.2 => 1.1.2 @emoji-mart/react: ^1.1.1 => 1.1.1 @emotion/babel-plugin: 11.10.5 => 11.10.5 (11.11.0) @emotion/react: 11.10.5 => 11.10.5 (11.11.1) @emotion/server: ^11.4.0 => 11.11.0 @emotion/styled: 11.10.5 => 11.10.5 (11.11.0) @hookform/devtools: ^4.1.0 => 4.3.1 @hookform/resolvers: ^2.9.7 => 2.9.11 @hookform/resolvers/ajv: 1.0.0 @hookform/resolvers/class-validator: 1.0.0 @hookform/resolvers/computed-types: 1.0.0 @hookform/resolvers/io-ts: 1.0.0 @hookform/resolvers/joi: 1.0.0 @hookform/resolvers/nope: 1.0.0 @hookform/resolvers/superstruct: 1.0.0 @hookform/resolvers/typanion: 1.0.0 @hookform/resolvers/vest: 1.0.0 @hookform/resolvers/yup: 1.0.0 @hookform/resolvers/zod: 1.0.0 @iconify/icons-ant-design: ^1.1.1 => 1.2.5 @iconify/icons-eva: ^1.1.0 => 1.2.6 @iconify/icons-ic: ^1.1.15 => 1.2.13 @iconify/react: ^3.1.0 => 3.2.2 @iconify/react/offline: undefined () @middy/core: ^4.5.0 => 4.5.5 @monaco-editor/react: ^4.4.6 => 4.5.1 @mui/icons-material: ^5.11.0 => 5.13.7 @mui/lab: ^5.0.0-alpha.118 => 5.0.0-alpha.135 @mui/material: ^5.11.7 => 5.13.7 @mui/x-data-grid-premium: ^5.17.22 => 5.17.26 @mui/x-date-pickers-pro: ^5.0.17 => 5.0.20 @mui/x-license-pro: ^5.17.12 => 5.17.12 @nestjs/common: 9.4.1 => 9.4.1 @nestjs/core: 9.4.1 => 9.4.1 @nestjs/platform-express: 9.4.1 => 9.4.1 @nestjs/schematics: 9.1.0 => 9.1.0 (9.2.0) @nestjs/testing: 9.4.1 => 9.4.1 @nx/cypress: 16.1.4 => 16.1.4 @nx/devkit: 16.1.4 => 16.1.4 (16.5.0) @nx/esbuild: 16.1.4 => 16.1.4 @nx/eslint-plugin: 16.1.4 => 16.1.4 @nx/jest: 16.1.4 => 16.1.4 (16.5.0) @nx/js: 16.1.4 => 16.1.4 (16.5.0) @nx/linter: 16.1.4 => 16.1.4 (16.5.0) @nx/nest: ^16.4.0 => 16.5.0 @nx/node: 16.1.4 => 16.1.4 (16.5.0) @nx/plugin: 16.1.4 => 16.1.4 @nx/react: 16.1.4 => 16.1.4 @nx/storybook: 16.1.4 => 16.1.4 @nx/web: 16.1.4 => 16.1.4 @nx/webpack: 16.1.4 => 16.1.4 @nx/workspace: 16.1.4 => 16.1.4 (16.5.0) @opensearch-project/opensearch: ^2.2.0 => 2.2.1 @pmmmwh/react-refresh-webpack-plugin: ^0.5.7 => 0.5.10 @reduxjs/toolkit: 1.9.0 => 1.9.0 @reduxjs/toolkit-query: 1.0.0 @reduxjs/toolkit-query-react: 1.0.0 @sentry/react: ^7.12.1 => 7.57.0 @sentry/tracing: ^7.12.1 => 7.57.0 @sparticuz/chromium: ^113.0.1 => 113.0.1 @storybook/addon-actions: ^7.0.12 => 7.0.26 (7.0.12) @storybook/addon-essentials: 7.0.12 => 7.0.12 @storybook/addon-storysource: ^7.0.12 => 7.0.26 @storybook/core-server: 7.0.12 => 7.0.12 @storybook/react: 7.0.12 => 7.0.12 @storybook/react-webpack5: 7.0.12 => 7.0.12 @svgr/webpack: ^6.1.2 => 6.5.1 @swc-node/register: ~1.4.2 => 1.4.2 @swc/cli: ~0.1.62 => 0.1.62 @swc/core: ~1.3.51 => 1.3.68 @swc/helpers: ~0.5.0 => 0.5.1 @testing-library/jest-dom: ^5.16.2 => 5.16.5 @testing-library/react: 13.4.0 => 13.4.0 @types/archiver: ^5.3.2 => 5.3.2 @types/aws-lambda: ^8.10.115 => 8.10.119 @types/aws-sdk: latest => 2.7.0 @types/aws4: ^1.11.2 => 1.11.3 @types/bcrypt: ^5.0.0 => 5.0.0 @types/draft-js: ^0.11.6 => 0.11.12 @types/draftjs-to-html: ^0.8.1 => 0.8.1 @types/escape-html: ^1.0.2 => 1.0.2 @types/exceljs: ^1.3.0 => 1.3.0 @types/express: ^4.17.17 => 4.17.17 (4.17.14) @types/facebook-js-sdk: ^3.3.6 => 3.3.6 @types/google-map-react: ^2.1.7 => 2.1.7 @types/html-to-text: ^8.1.1 => 8.1.1 @types/is-url: ^1.2.30 => 1.2.30 @types/jest: 29.4.4 => 29.4.4 (29.5.2) @types/json-logic-js: ^1.2.1 => 1.2.1 @types/lodash: ^4.14.177 => 4.14.195 (4.14.196) @types/multer: ^1.4.7 => 1.4.7 @types/node: 18.11.9 => 18.11.9 (16.18.6, 20.4.1, 14.18.53, 18.15.11, 16.18.38, 18.13.0, 16.9.1) @types/nprogress: ^0.2.0 => 0.2.0 @types/numeral: ^2.0.2 => 2.0.2 (0.0.28) @types/qrcode: ^1.5.0 => 1.5.1 @types/qrcode-terminal: ^0.12.0 => 0.12.0 @types/react: 18.0.25 => 18.0.25 (18.2.14) @types/react-color: ^3.0.6 => 3.0.6 @types/react-copy-to-clipboard: ^5.0.2 => 5.0.4 @types/react-dom: 18.0.9 => 18.0.9 (18.2.6) @types/react-draft-wysiwyg: ^1.13.3 => 1.13.4 @types/react-lazy-load-image-component: ^1.5.2 => 1.5.3 @types/react-router-dom: 5.3.3 => 5.3.3 @types/react-slick: ^0.23.10 => 0.23.10 @types/tar: ^6.1.5 => 6.1.5 @types/unzipper: ^0.10.6 => 0.10.6 @types/validator: ^13.7.12 => 13.7.17 @types/winston: ^2.4.4 => 2.4.4 @typescript-eslint/eslint-plugin: 5.59.6 => 5.59.6 @typescript-eslint/parser: 5.59.6 => 5.59.6 (5.10.1) @vonage/server-sdk: ^3.2.0 => 3.6.0 @wppconnect-team/wppconnect: ^1.28.0 => 1.28.0 ahooks: ^3.7.4 => 3.7.8 apexcharts: ^3.32.0 => 3.41.0 archiver: ^5.3.1 => 5.3.1 aws-amplify: ^5.0.22 => 5.3.3 aws-appsync-auth-link: ^3.0.7 => 3.0.7 aws-appsync-subscription-link: ^3.0.10 => 3.1.2 aws-lambda: latest => 1.0.7 aws-serverless-express: ^3.4.0 => 3.4.0 aws4: ^1.12.0 => 1.12.0 axios: ^1.0.0 => 1.4.0 (0.26.0, 0.26.1, 1.5.0, 1.1.3, 0.21.4) babel-jest: 29.4.3 => 29.4.3 (29.6.1) babel-loader: 8.1.0 => 8.1.0 (9.1.3, 8.3.0) babel-plugin-polyfill-corejs2: ^0.3.1 => 0.3.3 (0.4.4) babel-plugin-polyfill-corejs3: ^0.8.1 => 0.8.2 (0.6.0) babel-plugin-polyfill-regenerator: ^0.3.1 => 0.3.1 (0.5.1, 0.4.1) bcrypt: ^5.1.0 => 5.1.0 bsd-3-module: 0.0.0 businesscommunications: ^2.1.0 => 2.1.0 businessmessages: ^1.0.3 => 1.0.4 change-case: ^4.1.2 => 4.1.2 compromise: ^14.4.5 => 14.9.0 (14.4.4) compromise-dates: ^3.4.1 => 3.4.1 core-js: ^3.6.5 => 3.31.1 css-loader: ^6.4.0 => 6.8.1 (5.2.7) csv-parser: ^3.0.0 => 3.0.0 custom-license: 0.0.0 cypress: 12.12.0 => 12.12.0 dayjs: ^1.11.5 => 1.11.9 draft-js: ^0.11.7 => 0.11.7 draftjs-to-html: ^0.9.1 => 0.9.1 emoji-mart: ^5.5.2 => 5.5.2 esbuild: 0.17.17 => 0.17.17 (0.17.19) escape-html: ^1.0.3 => 1.0.3 eslint: ^8.18.0 => 8.44.0 eslint-config-next: 12.1.5 => 12.1.5 eslint-config-prettier: ^8.5.0 => 8.8.0 eslint-plugin-cypress: ^2.10.3 => 2.13.3 eslint-plugin-import: 2.26.0 => 2.26.0 (2.25.2) eslint-plugin-jsx-a11y: 6.6.1 => 6.6.1 (6.5.1) eslint-plugin-prettier: ^4.0.0 => 4.2.1 eslint-plugin-react: 7.31.11 => 7.31.11 (7.29.1) eslint-plugin-react-hooks: 4.6.0 => 4.6.0 (4.3.0) eslint-plugin-storybook: ^0.6.12 => 0.6.12 exceljs: ^4.3.0 => 4.3.0 file-loader: ^6.2.0 => 6.2.0 firebase: ^9.6.0 => 9.23.0 firebase-admin: ^10.2.0 => 10.3.0 firebase/analytics: undefined () firebase/app: undefined () firebase/app-check: undefined () firebase/auth: undefined () firebase/auth/cordova: undefined () firebase/auth/react-native: undefined () firebase/compat: undefined () firebase/compat/analytics: undefined () firebase/compat/app: undefined () firebase/compat/app-check: undefined () firebase/compat/auth: undefined () firebase/compat/database: undefined () firebase/compat/firestore: undefined () firebase/compat/functions: undefined () firebase/compat/installations: undefined () firebase/compat/messaging: undefined () firebase/compat/performance: undefined () firebase/compat/remote-config: undefined () firebase/compat/storage: undefined () firebase/database: undefined () firebase/firestore: undefined () firebase/firestore/lite: undefined () firebase/functions: undefined () firebase/installations: undefined () firebase/messaging: undefined () firebase/messaging/sw: undefined () firebase/performance: undefined () firebase/remote-config: undefined () firebase/storage: undefined () fork-ts-checker-webpack-plugin: ^7.3.0 => 7.3.0 (7.2.13, 4.1.6) form-data: ^4.0.0 => 4.0.0 (3.0.1, 2.3.3) formik: ^2.2.9 => 2.4.2 framer-motion: ^6.3.11 => 6.5.1 generate-password: ^1.7.0 => 1.7.0 google-auth-library: ^8.5.1 => 8.9.0 (7.14.1) google-map-react: ^2.2.0 => 2.2.1 googleapis: ^107.0.0 => 107.0.0 graphql: ^16.6.0 => 16.7.1 (15.8.0) graphql-request: ^3.7.0 => 3.7.0 graphql-ws: ^5.14.0 => 5.14.0 handlebars: ^4.7.7 => 4.7.7 highcharts: ^11.1.0 => 11.1.0 highcharts-react-official: ^3.2.0 => 3.2.0 highlight.js: ^11.4.0 => 11.8.0 (10.7.3) html-to-text: ^8.2.1 => 8.2.1 husky: ^7.0.0 => 7.0.4 i18next: ^21.6.3 => 21.10.0 i18next-browser-languagedetector: ^6.1.2 => 6.1.8 invalid-with-comma: 0.0.0 is-hotkey: ^0.2.0 => 0.2.0 (0.1.8) is-url: ^1.2.4 => 1.2.4 it-compromise: ^0.0.5 => 0.0.5 jest: 29.4.3 => 29.4.3 jest-environment-jsdom: 29.4.3 => 29.4.3 jest-environment-node: ^29.4.1 => 29.6.1 jimp: ^0.16.2 => 0.16.13 js-cookie: ^3.0.1 => 3.0.5 (3.0.1, 2.2.1) js-sha256: ^0.9.0 => 0.9.0 json-logic-js: ^2.0.2 => 2.0.2 jsonc-eslint-parser: ^2.1.0 => 2.3.0 jsonwebtoken: ^9.0.0 => 9.0.1 (8.5.1) jsx-runtime: 1.0.0 jwks-rsa: ^3.0.1 => 3.0.1 (2.1.5) jwt-decode: ^3.1.2 => 3.1.2 license-checker: ^25.0.1 => 25.0.1 lodash: ^4.17.21 => 4.17.21 microbundle-crl: ^0.13.11 => 0.13.11 moment: ^2.29.3 => 2.29.4 nanoid: ^3.3.1 => 3.3.6 node-polyfill-webpack-plugin: ^2.0.1 => 2.0.1 notistack: ^2.0.3 => 2.0.8 nprogress: ^0.2.0 => 0.2.0 numeral: ^2.0.6 => 2.0.6 nx: 16.1.4 => 16.1.4 (16.5.0) nx-cloud: 16.0.5 => 16.0.5 openai: ^3.2.1 => 3.3.0 p-map: ^6.0.0 => 6.0.0 (4.0.0) p-retry: ^5.1.2 => 5.1.2 (4.6.2) phone: ^3.1.28 => 3.1.37 preact: ^10.6.4 => 10.15.1 preact-cli: ^3.3.3 => 3.4.6 preact-compat: 4.0.0 preact-debug: 1.0.0 preact-devtools: 1.0.0 preact-habitat: ^3.3.0 => 3.3.0 preact-hooks: 0.1.0 preact-render-to-string: ^5.1.19 => 5.2.6 prettier: 2.6.2 => 2.6.2 (2.8.8) private: 0.0.0 prop-types: ^15.8.1 => 15.8.1 public-domain-module: 0.0.0 qrcode: ^1.5.1 => 1.5.3 qrcode-reader: ^1.0.4 => 1.0.4 qrcode-terminal: ^0.12.0 => 0.12.0 quill-emoji: ^0.2.0 => 0.2.0 react: 18.2.0 => 18.2.0 react-apexcharts: ^1.3.9 => 1.4.0 react-calendly: ^4.1.0 => 4.1.1 react-color: ^2.19.3 => 2.19.3 react-copy-to-clipboard: ^5.1.0 => 5.1.0 react-dom: 18.2.0 => 18.2.0 react-draft-wysiwyg: ^1.14.7 => 1.15.0 react-dropzone: ^12.0.4 => 12.1.0 (11.7.1) react-google-charts: ^4.0.0 => 4.0.0 react-helmet-async: ^1.3.0 => 1.3.0 react-hook-form: ^7.20.5 => 7.45.1 react-i18next: ^11.14.3 => 11.18.6 react-image-lightbox: ^5.1.4 => 5.1.4 react-intersection-observer: ^8.32.5 => 8.34.0 react-is: 18.2.0 => 18.2.0 (17.0.2, 18.1.0, 16.13.1) react-json-pretty: ^2.2.0 => 2.2.0 react-json-view: ^1.21.3 => 1.21.3 react-lazy-load-image-component: ^1.5.6 => 1.6.0 react-quill: ^2.0.0 => 2.0.0 react-quill-emoji: ^0.1.9 => 0.1.9 react-refresh: ^0.10.0 => 0.10.0 (0.11.0) react-router: ^6.6.1 => 6.14.1 react-router-dom: ^6.6.1 => 6.14.1 react-scroll: ^1.8.4 => 1.8.9 react-slick: ^0.29.0 => 0.29.0 react-spring: ^9.3.2 => 9.7.2 react-test-renderer: 18.2.0 => 18.2.0 redux-persist: ^6.0.0 => 6.0.0 redux-persist/integration/react: undefined () reflect-metadata: ^0.1.13 => 0.1.13 regenerator-runtime: 0.13.7 => 0.13.7 (0.13.11) retire: ^3.2.1 => 3.2.4 rxjs: ^7.8.0 => 7.8.1 (6.6.7) rxjs/ajax: undefined () rxjs/fetch: undefined () rxjs/internal-compatibility: undefined () rxjs/operators: undefined () rxjs/testing: undefined () rxjs/webSocket: undefined () seamless-scroll-polyfill: ^2.2.0 => 2.3.4 serve: ^13.0.2 => 13.0.4 simplebar-react: ^2.4.3 => 2.4.3 slate: ^0.94.1 => 0.94.1 slate-history: ^0.93.0 => 0.93.0 slate-hyperscript: ^0.77.0 => 0.77.0 slate-react: ^0.98.1 => 0.98.1 slick-carousel: ^1.8.1 => 1.8.1 storybook: 7.0.12 => 7.0.12 style-loader: ^3.3.0 => 3.3.3 (2.0.0) stylus: ^0.55.0 => 0.55.0 stylus-loader: ^7.1.0 => 7.1.3 svix: ^1.5.0 => 1.5.2 tar: ^6.1.15 => 6.1.15 (4.4.19, 6.1.11) test-utils: 0.1.0 ts-jest: 29.1.0 => 29.1.0 ts-node: 10.9.1 => 10.9.1 (9.1.1) tslib: ^2.5.2 => 2.6.0 (1.14.1, 2.5.2, 2.4.1, 2.5.0, 1.10.0) typescript: 5.0.4 => 5.0.4 (4.9.5, 3.9.10, 4.6.4, 4.4.4) unzipper: ^0.10.14 => 0.10.14 url-loader: ^4.1.1 => 4.1.1 use-sound: ^4.0.1 => 4.0.1 uuidv4: ^6.2.13 => 6.2.13 validator: ^13.9.0 => 13.9.0 web-vitals: ^2.1.4 => 2.1.4 webpack: ^5.75.0 => 5.88.1 (4.46.0) webpack-merge: ^5.8.0 => 5.9.0 whatsapp-cloud-api: ^0.3.1 => 0.3.1 whatsapp-web.js: ^1.20.0 => 1.21.0 widgets-preact-boilerplate: 0.9.0 winston: ^3.7.2 => 3.9.0 (3.10.0) yarn: ^1.22.17 => 1.22.19 yup: ^0.32.11 => 0.32.11 zod: ^3.18.0 => 3.21.4 npmGlobalPackages: corepack: 0.17.0 npm: 9.5.1 ```

Describe the bug

using the @searchable directive in schema graphql, creates a lambda to stream from Dynamo DB to Opensearch

We often see this lambda throwing timeout error, which makes the data out of sync and missing between dynamodb to opensearch

Expected behavior

No Timeout issues

Reproduction steps

NA

Code Snippet

// Put your code below this line.
import base64
import json
import logging
import os
import time
import traceback
from urllib.parse import urlparse, quote

from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import get_credentials
from botocore.endpoint import BotocoreHTTPSession
from botocore.session import Session
from boto3.dynamodb.types import TypeDeserializer

# https://github.com/aws-amplify/amplify-category-api/blob/main/packages/amplify-graphql-searchable-transformer/streaming-lambda/python_streaming_function.py
# The following parameters are required to configure the OpenSearch cluster
OPENSEARCH_ENDPOINT = os.environ['OPENSEARCH_ENDPOINT']
OPENSEARCH_REGION = os.environ['OPENSEARCH_REGION']
DEBUG = True if os.environ['DEBUG'] == "1" else False
OPENSEARCH_USE_EXTERNAL_VERSIONING = True if os.environ['OPENSEARCH_USE_EXTERNAL_VERSIONING'] == "true" else False

# Multiple mapping types in an index is deprecated in OpenSearch ver 7.10+. Default to _doc.
DOC_TYPE = '_doc'
OPENSEARCH_MAX_RETRIES = 3 # Max number of retries for exponential backoff

logger = logging.getLogger()
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
logger.info("Streaming to Open search 2.7.0")

# custom encoder changes
# - sets to lists
class DDBTypesEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return list(obj)
        return json.JSONEncoder.default(self, obj)

# Subclass of boto's TypeDeserializer for DynamoDB to adjust for DynamoDB Stream format.
class StreamTypeDeserializer(TypeDeserializer):
    def _deserialize_n(self, value):
        val = float(value)
        if (val.is_integer()):
            return int(value)
        else:
            return val

    def _deserialize_b(self, value):
        return value  # Already in Base64

class Searchable_Exception(Exception):
    '''Capture status_code from request'''
    status_code = 0
    payload = ''

    def __init__(self, status_code, payload):
        self.status_code = status_code
        self.payload = payload
        Exception.__init__(
            self, 'Searchable_Exception: status_code={}, payload={}'.format(status_code, payload))

# Low-level POST data to Amazon OpenSearch Service generating a Sigv4 signed request
def post_data_to_opensearch(payload, region, creds, host, path, method='POST', proto='https://'):
    '''Post data to OpenSearch endpoint with SigV4 signed http headers'''
    req = AWSRequest(method=method, url=proto + host +
                    quote(path), data=payload, headers={'Host': host, 'Content-Type': 'application/json'})
    # SigV4Auth may be expecting 'es' but need to swap with 'os' or 'OpenSearch'
    SigV4Auth(creds, 'es', region).add_auth(req)
    http_session = BotocoreHTTPSession()
    res = http_session.send(req.prepare())
    if res.status_code >= 200 and res.status_code <= 299:
        return res._content
    else:
        raise Searchable_Exception(res.status_code, res._content)

# High-level POST data to Amazon OpenSearch Service with exponential backoff
# according to suggested algorithm: http://docs.aws.amazon.com/general/latest/gr/api-retries.html
def post_to_opensearch(payload):
    '''Post data to OpenSearch cluster with exponential backoff'''

    # Get aws_region and credentials to post signed URL to OpenSearch
    opensearch_region = OPENSEARCH_REGION or os.environ['AWS_REGION']
    session = Session({'region': opensearch_region})
    creds = get_credentials(session)
    opensearch_url = urlparse(OPENSEARCH_ENDPOINT)
    # Extract the domain name in OPENSEARCH_ENDPOINT
    opensearch_endpoint = opensearch_url.netloc or opensearch_url.path

    # Post data with exponential backoff
    retries = 0
    while retries < OPENSEARCH_MAX_RETRIES:
        if retries > 0:
            seconds = (2 ** retries) * .1
            logger.debug('Waiting for %.1f seconds', seconds)
            time.sleep(seconds)

        try:
            opensearch_ret_str = post_data_to_opensearch(
                payload, opensearch_region, creds, opensearch_endpoint, '/_bulk')
            logger.debug('Return from OpenSearch: %s', opensearch_ret_str)
            opensearch_ret = json.loads(opensearch_ret_str)

            if opensearch_ret['errors']:
                logger.error(
                    'OpenSearch post unsuccessful, errors present, took=%sms', opensearch_ret['took'])
                # Filter errors
                opensearch_errors = [item for item in opensearch_ret['items']
                            if item.get('index', {}).get('error')]
                logger.error('List of items with errors: %s',
                            json.dumps(opensearch_errors))
            else:
                logger.info('OpenSearch post successful, took=%sms', opensearch_ret['took'])
            break  # Sending to OpenSearch was ok, break retry loop
        except Searchable_Exception as e:
            if (e.status_code >= 500) and (e.status_code <= 599):
                retries += 1  # Candidate for retry
            else:
                raise  # Stop retrying, re-raise exception

# Extracts the DynamoDB table from an ARN
# ex: arn:aws:dynamodb:eu-west-1:123456789012:table/table-name/stream/2015-11-13T09:23:17.104 should return 'table-name'
def get_table_name_from_arn(arn):
    return arn.split(':')[5].split('/')[1]

# Compute a compound doc index from the key(s) of the object in lexicographic order: "k1=key_val1|k2=key_val2"
def compute_doc_index(keys_raw, deserializer, formatIndex=False):
    index = []
    for key in sorted(keys_raw):
        if formatIndex:
            index.append('{}={}'.format(
                key, deserializer.deserialize(keys_raw[key])))
        else:
            index.append(deserializer.deserialize(keys_raw[key]))
    return '|'.join(map(str,index))

def _lambda_handler(event, context):
    logger.debug('Event: %s', event)
    records = event['Records']

    ddb_deserializer = StreamTypeDeserializer()
    opensearch_actions = []  # Items to be added/updated/removed from OpenSearch - for bulk API
    cnt_insert = cnt_modify = cnt_remove = 0

    for record in records:
        # Handle both native DynamoDB Streams or Streams data from Kinesis (for manual replay)
        logger.debug('Record: %s', record)
        if record.get('eventSource') == 'aws:dynamodb':
            ddb = record['dynamodb']
            ddb_table_name = get_table_name_from_arn(record['eventSourceARN'])
            doc_seq = ddb['SequenceNumber']
        elif record.get('eventSource') == 'aws:kinesis':
            ddb = json.loads(base64.b64decode(record['kinesis']['data']))
            ddb_table_name = ddb['SourceTable']
            doc_seq = record['kinesis']['sequenceNumber']
        else:
            logger.error('Ignoring non-DynamoDB event sources: %s',
                        record.get('eventSource'))
            continue

        # Compute DynamoDB table, type and index for item
        doc_table = ddb_table_name.lower()
        doc_type = DOC_TYPE
        doc_table_parts = doc_table.split('-')
        doc_opensearch_index_name = doc_table_parts[0] if len(doc_table_parts) > 0  else doc_table

        # Dispatch according to event TYPE
        event_name = record['eventName'].upper()  # INSERT, MODIFY, REMOVE
        logger.debug('doc_table=%s, event_name=%s, seq=%s',
                    doc_table, event_name, doc_seq)

        # Treat events from a Kinesis stream as INSERTs
        if event_name == 'AWS:KINESIS:RECORD':
            event_name = 'INSERT'

        is_ddb_insert_or_update = (event_name == 'INSERT') or (event_name == 'MODIFY')
        is_ddb_delete = event_name == 'REMOVE'
        image_name = 'NewImage' if is_ddb_insert_or_update else 'OldImage'

        if image_name not in ddb:
            logger.warning(
                'Cannot process stream if it does not contain ' + image_name)
            continue
        logger.debug(image_name + ': %s', ddb[image_name])
        # Deserialize DynamoDB type to Python types
        doc_fields = ddb_deserializer.deserialize({'M': ddb[image_name]})

        # Sync enabled APIs do soft delete. We need to delete the record in OpenSearch if _deleted field is set
        if OPENSEARCH_USE_EXTERNAL_VERSIONING and event_name == 'MODIFY' and '_deleted' in  doc_fields and doc_fields['_deleted']:
            is_ddb_insert_or_update = False
            is_ddb_delete = True

         # Update counters
        if event_name == 'INSERT':
            cnt_insert += 1
        elif event_name == 'MODIFY':
            cnt_modify += 1
        elif event_name == 'REMOVE':
            cnt_remove += 1
        else:
            logger.warning('Unsupported event_name: %s', event_name)

        logger.debug('Deserialized doc_fields: %s', doc_fields)

        if ('Keys' in ddb):
            doc_id = compute_doc_index(ddb['Keys'], ddb_deserializer)
        else:
            logger.error('Cannot find keys in ddb record')

        # If DynamoDB INSERT or MODIFY, send 'index' to OpenSearch
        if is_ddb_insert_or_update:
            # Generate OpenSearch payload for item
            action = {'index': {'_index': doc_opensearch_index_name,
                                '_id': doc_id}}
            # Add external versioning if necessary
            if OPENSEARCH_USE_EXTERNAL_VERSIONING and '_version' in doc_fields:
                action['index'].update([
                    ('version_type', 'external'),
                    ('version', doc_fields['_version'])
                ])
                doc_fields.pop('_ttl', None)
                doc_fields.pop('_version', None)
            # Append OpenSearch Action line with 'index' directive
            opensearch_actions.append(json.dumps(action))
            # Append JSON payload
            opensearch_actions.append(json.dumps(doc_fields, cls=DDBTypesEncoder))
            # migration step remove old key if it exists
            if ('id' in doc_fields) and (event_name == 'MODIFY') :
                action = {'delete': {'_index': doc_opensearch_index_name, 
                    '_id': compute_doc_index(ddb['Keys'], ddb_deserializer, True)}}
                opensearch_actions.append(json.dumps(action))
        # If DynamoDB REMOVE, send 'delete' to OpenSearch
        elif is_ddb_delete:
            action = {'delete': {'_index': doc_opensearch_index_name,
                                 '_id': doc_id}}
            if OPENSEARCH_USE_EXTERNAL_VERSIONING and '_version' in doc_fields:
                action['delete'].update([
                    ('version_type', 'external'),
                    ('version', doc_fields['_version'])
                ])
            # Action line with 'delete' directive
            opensearch_actions.append(json.dumps(action))

    # Prepare bulk payload
    opensearch_actions.append('')  # Add one empty line to force final \n
    opensearch_payload = '\n'.join(opensearch_actions)
    logger.info('Posting to OpenSearch: inserts=%s updates=%s deletes=%s, total_lines=%s, bytes_total=%s',
                cnt_insert, cnt_modify, cnt_remove, len(opensearch_actions) - 1, len(opensearch_payload))
    post_to_opensearch(opensearch_payload)  # Post to OpenSearch with exponential backoff

# Global lambda handler - catches all exceptions to avoid dead letter in the DynamoDB Stream
def lambda_handler(event, context):
    try:
        return _lambda_handler(event, context)
    except Exception:
        logger.error(traceback.format_exc())

Log output

``` // Put your logs below this line START RequestId: b6ffa65b-bd16-4001-857a-368b6e38bfa5 Version: $LATEST 5 2023-09-07T20:26:14.668+04:00 [INFO] 2023-09-07T16:26:14.668Z b6ffa65b-bd16-4001-857a-368b6e38bfa5 Posting to OpenSearch: inserts=0 updates=3 deletes=0, total_lines=9, bytes_total=7251 Field Value @ingestionTime 1694103983687 @log 185565015866:/aws/lambda/amplify-halo-prod-205015--OpenSearchStreamingLambd-5hCk132W1UpT @logStream 2023/09/07/[$LATEST]962934ff50964881a9a21268b1a88050 @message [INFO] 2023-09-07T16:26:14.668Z b6ffa65b-bd16-4001-857a-368b6e38bfa5 Posting to OpenSearch: inserts=0 updates=3 deletes=0, total_lines=9, bytes_total=7251 @requestId b6ffa65b-bd16-4001-857a-368b6e38bfa5 @timestamp 1694103974668 6 2023-09-07T20:26:14.688+04:00 [INFO] 2023-09-07T16:26:14.688Z b6ffa65b-bd16-4001-857a-368b6e38bfa5 Found credentials in environment variables. 7 2023-09-07T20:26:14.708+04:00 [INFO] 2023-09-07T16:26:14.708Z b6ffa65b-bd16-4001-857a-368b6e38bfa5 Starting new HTTPS connection (1): search-amplify-opense-zdjdy8c5pdif-ekmmx63wwn5f5dhywb5yu7fxqa.eu-central-1.es.amazonaws.com Field Value @ingestionTime 1694103983687 @log 185565015866:/aws/lambda/amplify-halo-prod-205015--OpenSearchStreamingLambd-5hCk132W1UpT @logStream 2023/09/07/[$LATEST]962934ff50964881a9a21268b1a88050 @message [INFO] 2023-09-07T16:26:14.708Z b6ffa65b-bd16-4001-857a-368b6e38bfa5 Starting new HTTPS connection (1): search-amplify-opense-zdjdy8c5pdif-ekmmx63wwn5f5dhywb5yu7fxqa.eu-central-1.es.amazonaws.com @requestId b6ffa65b-bd16-4001-857a-368b6e38bfa5 @timestamp 1694103974708 8 2023-09-07T20:26:44.658+04:00 2023-09-07T16:26:44.658Z b6ffa65b-bd16-4001-857a-368b6e38bfa5 Task timed out after 30.03 seconds Field Value @ingestionTime 1694104004684 @log 185565015866:/aws/lambda/amplify-halo-prod-205015--OpenSearchStreamingLambd-5hCk132W1UpT @logStream 2023/09/07/[$LATEST]962934ff50964881a9a21268b1a88050 @message 2023-09-07T16:26:44.658Z b6ffa65b-bd16-4001-857a-368b6e38bfa5 Task timed out after 30.03 seconds @requestId b6ffa65b-bd16-4001-857a-368b6e38bfa5 @timestamp 1694104004658 ```

aws-exports.js

No response

Manual configuration

No response

Additional configuration

No response

Mobile Device

No response

Mobile Operating System

No response

Mobile Browser

No response

Mobile Browser Version

No response

Additional information and screenshots

No response

cwomack commented 1 year ago

Hello, @mithun35h and thank you for opening this issue. Can you clarify what your lambda timeout is currently set to? I'm curious to see if it's still at the default setting of 3 seconds and if increasing it to something like 10 seconds changes the behavior at all.

mithun35h commented 1 year ago

its set to 30s and memory is increased to 512mb

I did enable the logs in opensearch as well, i don't see the timeout issue there in either index logs or application logs of opensearch

I did increase the opensearch instance sizes and added a 2 nodes to make sure health is green

mithun35h commented 12 months ago

This was due to indexing rate error, i optimised the customer by reducing the replicas and number of shards

And also increase the timeout to even higher

Time out error are not seen any more

cwomack commented 12 months ago

@mithun35h, great to hear you got this sorted out! If there's further issues, feel free to comment back or open a new issue (if related to another error/problem).