electric-sql / pglite

Lightweight WASM Postgres with real-time, reactive bindings.
https://pglite.dev
Apache License 2.0
8.06k stars 154 forks source link

Sync plugin: TypeError: Cannot read properties of undefined (reading 'debug') #232

Closed josephwinston closed 2 weeks ago

josephwinston commented 4 weeks ago

I am attempting to follow: https://github.com/electric-sql/pglite/tree/1de36fef444f3c0030216809c1226dae7c15230d/packages/pglite-sync/example

package.json

{
  "name": "sync",
  "version": "1.0.0",
  "description": "",
  "type": "module",
  "main": "main.js",
  "dependencies": {
    "@electric-sql/pglite": "^0.2.2",
    "@electric-sql/pglite-sync": "^0.2.2"
  },
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "private": true,
  "devDependencies": {
    "prettier": "^3.3.3"
  }
}

main.js

import { PGlite } from "@electric-sql/pglite";
import { electricSync } from "@electric-sql/pglite-sync";

const pg = await PGlite.create({
  extensions: {
    electric: electricSync(),
  },
});

await pg.exec(`
  CREATE TABLE IF NOT EXISTS todo (
    id SERIAL PRIMARY KEY,
    task TEXT,
    done BOOLEAN
  );
`);

$ node main.js

file:///home/josephwinston/src/Prototypes/PGlite/Sync/node_modules/@electric-sql/pglite-sync/dist/index.js:1
var P=Object.defineProperty,d=Object.getOwnPropertySymbols,O=Object.prototype.hasOwnProperty,w=Object.prototype.propertyIsEnumerable,S=(e,t,s)=>t in e?P(e,t,{enumerable:!0,configurable:!0,writable:!0,value:s}):e[t]=s,T=(e,t)=>{for(var s in t||(t={}))O.call(t,s)&&S(e,s,t[s]);if(d)for(var s of d(t))w.call(t,s)&&S(e,s,t[s]);return e},U=(e,t)=>{var s={};for(var r in e)O.call(e,r)&&t.indexOf(r)<0&&(s[r]=e[r]);if(e!=null&&d)for(var r of d(e))t.indexOf(r)<0&&w.call(e,r)&&(s[r]=e[r]);return s},m=(e,t,s)=>new Promise((r,o)=>{var n=c=>{try{a(s.next(c))}catch(h){o(h)}},i=c=>{try{a(s.throw(c))}catch(h){o(h)}},a=c=>c.done?r(c.value):Promise.resolve(c.value).then(n,i);a((s=s.apply(e,t)).next())}),b=e=>Number(e),I=e=>e==="true"||e==="t",_=e=>BigInt(e),v=e=>JSON.parse(e),j={int2:b,int4:b,int8:_,bool:I,float4:b,float8:b,json:v,jsonb:v};function C(e,t){let s=0,r=null,o="",n=!1,i=0,a;function c(h){let l=[];for(;s<h.length;s++){if(r=h[s],n)r==="\\"?o+=h[++s]:r==='"'?(l.push(t?t(o):o),o="",n=h[s+1]==='"',i=s+2):o+=r;else if(r==='"')n=!0;else if(r==="{")i=++s,l.push(c(h));else if(r==="}"){n=!1,i<s&&l.push(t?t(h.slice(i,s)):h.slice(i,s)),i=s+1;break}else r===","&&a!=="}"&&a!=='"'&&(l.push(t?t(h.slice(i,s)):h.slice(i,s)),i=s+1);a=r}return i<s&&l.push(t?t(h.slice(i,s+1)):h.slice(i,s+1)),l}return c(e)[0]}var k=class{constructor(e){this.parser=T(T({},j),e)}parse(e,t){return JSON.parse(e,(s,r)=>{if(s==="value"&&typeof r=="object"){let o=r;Object.keys(o).forEach(n=>{o[n]=this.parseRow(n,o[n],t)})}return r})}parseRow(e,t,s){let r=s[e];if(!r)return t;let o=this.parser[r.type],n=r,{type:i,dims:a}=n,c=U(n,["type","dims"]);return a>0?C(t,o??(l=>l)):o?o(t,c):t}},$={initialDelay:100,maxDelay:1e4,multiplier:1.3},A=class{constructor(e){this.messageQueue=[],this.isProcessing=!1,this.callback=e}process(e){this.messageQueue.push(e),this.isProcessing||this.processQueue()}processQueue(){return m(this,null,function*(){for(this.isProcessing=!0;this.messageQueue.length>0;){let e=this.messageQueue.shift();yield this.callback(e)}this.isProcessing=!1})}},g=class E extends Error{constructor(t,s,r,o,n,i){super(i||`HTTP Error ${t} at ${n}: ${s??JSON.stringify(r)}`),this.url=n,this.name="FetchError",this.status=t,this.text=s,this.json=r,this.headers=o}static fromResponse(t,s){return m(this,null,function*(){let r=t.status,o=Object.fromEntries([...t.headers.entries()]),n,i,a=t.headers.get("content-type");return a&&a.includes("application/json")?i=yield t.json():n=yield t.text(),new E(r,n,i,o,s)})}},D=class{constructor(e){this.subscribers=new Map,this.upToDateSubscribers=new Map,this.isUpToDate=!1;var t,s,r;this.validateOptions(e),this.options=T({subscribe:!0},e),this.lastOffset=(t=this.options.offset)!=null?t:"-1",this.shapeId=this.options.shapeId,this.messageParser=new k(e.parser),this.backoffOptions=(s=e.backoffOptions)!=null?s:$,this.fetchClient=(r=e.fetchClient)!=null?r:(...o)=>fetch(...o),this.start()}start(){return m(this,null,function*(){var e,t;this.isUpToDate=!1;let{url:s,where:r,signal:o}=this.options;for(;!o?.aborted&&!this.isUpToDate||this.options.subscribe;){let n=new URL(s);r&&n.searchParams.set("where",r),n.searchParams.set("offset",this.lastOffset),this.isUpToDate&&n.searchParams.set("live","true"),this.shapeId&&n.searchParams.set("shape_id",this.shapeId);let i;try{let u=yield this.fetchWithBackoff(n);if(u)i=u;else break}catch(u){if(!(u instanceof g))throw u;if(u.status==409){let M=u.headers["x-electric-shape-id"];this.reset(M),this.publish(u.json);continue}else if(u.status>=400&&u.status<500)throw this.sendErrorToUpToDateSubscribers(u),this.sendErrorToSubscribers(u),u}let{headers:a,status:c}=i,h=a.get("X-Electric-Shape-Id");h&&(this.shapeId=h);let l=a.get("X-Electric-Chunk-Last-Offset");l&&(this.lastOffset=l);let f=()=>{let u=a.get("X-Electric-Schema");return u?JSON.parse(u):{}};this.schema=(e=this.schema)!=null?e:f();let y=c===204?"[]":yield i.text(),p=this.messageParser.parse(y,this.schema);p.length>0&&(((t=p[p.length-1].headers)==null?void 0:t.control)==="up-to-date"&&!this.isUpToDate&&(this.isUpToDate=!0,this.notifyUpToDateSubscribers()),this.publish(p))}})}subscribe(e,t){let s=Math.random(),r=new A(e);return this.subscribers.set(s,[r,t]),()=>{this.subscribers.delete(s)}}unsubscribeAll(){this.subscribers.clear()}publish(e){this.subscribers.forEach(([t,s])=>{t.process(e)})}sendErrorToSubscribers(e){this.subscribers.forEach(([t,s])=>{s?.(e)})}subscribeOnceToUpToDate(e,t){let s=Math.random();return this.upToDateSubscribers.set(s,[e,t]),()=>{this.upToDateSubscribers.delete(s)}}unsubscribeAllUpToDateSubscribers(){this.upToDateSubscribers.clear()}notifyUpToDateSubscribers(){this.upToDateSubscribers.forEach(([e])=>{e()})}sendErrorToUpToDateSubscribers(e){this.upToDateSubscribers.forEach(([t,s])=>s(e))}reset(e){this.lastOffset="-1",this.shapeId=e,this.isUpToDate=!1,this.schema=void 0}validateOptions(e){if(!e.url)throw new Error("Invalid shape option. It must provide the url");if(e.signal&&!(e.signal instanceof AbortSignal))throw new Error("Invalid signal option. It must be an instance of AbortSignal.");if(e.offset!==void 0&&e.offset!=="-1"&&!e.shapeId)throw new Error("shapeId is required if this isn't an initial fetch (i.e. offset > -1)")}fetchWithBackoff(e){return m(this,null,function*(){let{initialDelay:t,maxDelay:s,multiplier:r}=this.backoffOptions,o=this.options.signal,n=t,i=0;for(;;)try{let a=yield this.fetchClient(e.toString(),{signal:o});if(a.ok)return a;throw yield g.fromResponse(a,e.toString())}catch(a){if(o?.aborted)return;if(a instanceof g&&a.status>=400&&a.status<500)throw a;yield new Promise(c=>setTimeout(c,n)),n=Math.min(n*r,s),i++,console.log(`Retry attempt #${i} after ${n}ms`)}})}};async function R(e,t){let s=t.debug||!1,r=[];return{namespaceObj:{syncShapeToTable:async i=>{let a=new AbortController;i.signal&&i.signal.addEventListener("abort",()=>a.abort(),{once:!0});let c=new D({...i,signal:a.signal});return c.subscribe(async l=>{s&&console.log("sync messages received",l);for(let f of l)e.transaction(async y=>{await x({pg:y,rawMessage:f,table:i.table,schema:i.schema,mapColumns:i.mapColumns,primaryKey:i.primaryKey,debug:s})})}),r.push({stream:c,aborter:a}),{unsubsribe:()=>{c.unsubscribeAll(),a.abort()},get isUpToDate(){return c.isUpToDate},get shapeId(){return c.shapeId},get lastOffset(){return c.lastOffset},subscribeOnceToUpToDate:(l,f)=>c.subscribeOnceToUpToDate(l,f),unsubscribeAllUpToDateSubscribers:()=>{c.unsubscribeAllUpToDateSubscribers()}}}},close:async()=>{for(let{stream:i,aborter:a}of r)i.unsubscribeAll(),a.abort()}}}function G(e){return{name:"ElectricSQL Sync",setup:async t=>{let{namespaceObj:s,close:r}=await R(t,e);return{namespaceObj:s,close:r}}}}function N(e,t){if(typeof e=="function")return e(t);{let s={};for(let[r,o]of Object.entries(e))s[r]=t.value[o];return s}}async function x({pg:e,table:t,schema:s="public",rawMessage:r,mapColumns:o,primaryKey:n,debug:i}){if(!r.headers.action)return;let a=r,c=o?N(o,a):a.value;if(a.headers.action==="insert"){i&&console.log("inserting",c);let h=Object.keys(c);await e.query(`

TypeError: Cannot read properties of undefined (reading 'debug')
    at R (file:///home/josephwinston/src/Prototypes/PGlite/Sync/node_modules/@electric-sql/pglite-sync/dist/index.js:1:5694)
    at Object.setup (file:///home/josephwinston/src/Prototypes/PGlite/Sync/node_modules/@electric-sql/pglite-sync/dist/index.js:1:6619)
    at Dt.Tr (file:///home/josephwinston/src/Prototypes/PGlite/Sync/node_modules/@electric-sql/pglite/dist/index.js:1:53651)
    at async Dt.create (file:///home/josephwinston/src/Prototypes/PGlite/Sync/node_modules/@electric-sql/pglite/dist/index.js:1:49540)
    at async file:///home/josephwinston/src/Prototypes/PGlite/Sync/main.js:4:12

Node.js v20.16.0

Please point me in the correct direction

samwillis commented 4 weeks ago

Thanks for the report, looks like we have a bug in the extension. Until we release a fix you can do this below, note the empty object passed to electricSync:

const pg = await PGlite.create({
  extensions: {
    electric: electricSync({}),
  },
});

Note to self: the fix is:

  const debug = options?.debug ?? false

https://github.com/electric-sql/pglite/blob/e1332d0902e26b81039fc36fbf8f4c766a6c71ea/packages/pglite-sync/src/index.ts#L25C24-L25C25

josephwinston commented 4 weeks ago

Works for me with the empty object

ba-ppp commented 3 weeks ago
const pg = await PGlite.create({
  extensions: {
    live, electric: electricSync({ debug: true }),
  }
})

You need to add debug here