Nerixyz / instagram_mqtt

Realtime and Push Notification (FBNS) support for the instagram-private-api
MIT License
252 stars 50 forks source link

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close #75

Closed covein closed 3 years ago

covein commented 3 years ago

The goal is to get live stream notification, when the profile which I follow starts a live stream. And when some one send a direct message to me, I can get a notification. I am a rookie, so I may have made some low-level mistakes. sorry for that.

I login the account A.

Problem1: I use the example/push.example.ts & realtime.example.ts, but it always shows the ERROR with 'Premature Close', and when I use account B to send message to account A, there is no message show in the console.

Problem2: I use readState(ig) and saveState(ig), but the 'new activity' in instagram app shows multi login activities. I thought there should be one login activity because after the first login from a never-used device, the second login uses the cookie from the state.json.

 ig:mqtt:realtime Subscribing with Skywalker to ig/u/v1/******, ig/live_notification_subscribe/******** +0ms
  ig:mqtt:realtime Iris Sub to: seqId: ****, snapshot: ****** +0ms
  ig:mqtt:mqttot edge-mqtt.facebook.com: Error: Premature close
  ig:mqtt:mqttot        Stack: Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
  ig:mqtt:mqttot     at new NodeError (node:internal/errors:329:5)
  ig:mqtt:mqttot     at MqttTransformer.onclose (node:internal/streams/end-of-stream:138:38)
  ig:mqtt:mqttot     at MqttTransformer.emit (node:events:381:22)
  ig:mqtt:mqttot     at MqttTransformer.emit (node:domain:470:12)
  ig:mqtt:mqttot     at emitCloseNT (node:internal/streams/destroy:169:10)
  ig:mqtt:mqttot     at processTicksAndRejections (node:internal/process/task_queues:80:21) +0ms
  ig:mqtt:mqttot edge-mqtt.facebook.com: Warning: _a.check is not a function
  ig:mqtt:mqttot        Stack: TypeError: _a.check is not a function
  ig:mqtt:mqttot     at MQTToTClient.setDisconnected (/project/path/node_modules/mqtts/src/mqtt.client.ts:475:55)
  ig:mqtt:mqttot     at /project/path/node_modules/mqtts/src/mqtt.client.ts:175:33
  ig:mqtt:mqttot     at node:internal/util:408:5
  ig:mqtt:mqttot     at finish (node:internal/streams/pipeline:163:7)
  ig:mqtt:mqttot     at node:internal/util:408:5
  ig:mqtt:mqttot     at node:internal/streams/pipeline:74:5
  ig:mqtt:mqttot     at finish (node:internal/streams/pipeline:159:23)
  ig:mqtt:mqttot     at node:internal/util:408:5
  ig:mqtt:mqttot     at node:internal/streams/pipeline:74:5
  ig:mqtt:mqttot     at finish (node:internal/streams/pipeline:159:23)
  ig:mqtt:mqttot     at node:internal/util:408:5
  ig:mqtt:mqttot     at MqttTransformer.<anonymous> (node:internal/streams/pipeline:66:7)
  ig:mqtt:mqttot     at MqttTransformer.<anonymous> (node:internal/util:408:5)
  ig:mqtt:mqttot     at MqttTransformer.onclose (node:internal/streams/end-of-stream:138:25)
  ig:mqtt:mqttot     at MqttTransformer.emit (node:events:381:22)
  ig:mqtt:mqttot     at MqttTransformer.emit (node:domain:470:12)
  ig:mqtt:mqttot     at emitCloseNT (node:internal/streams/destroy:169:10)
  ig:mqtt:mqttot     at processTicksAndRejections (node:internal/process/task_queues:80:21) +0ms
import 'dotenv/config';
import { IgApiClient } from 'instagram-private-api';
import { IgApiClientExt, withRealtime } from 'instagram_mqtt';
import { GraphQLSubscriptions } from 'instagram_mqtt';
import { SkywalkerSubscriptions } from 'instagram_mqtt';
import { promisify } from 'util';
import { writeFile, readFile, exists } from 'fs';

const writeFileAsync = promisify(writeFile);
const readFileAsync = promisify(readFile);
const existsAsync = promisify(exists);

async function saveState(ig: IgApiClientExt) {
    return writeFileAsync('state.json', await ig.exportState(), {encoding: 'utf8'});
}

async function readState(ig: IgApiClientExt) {
    if (!await existsAsync('state.json'))
        return;
    await ig.importState(await readFileAsync('state.json', {encoding: 'utf8'}));
}

async function loginToInstagram(ig: IgApiClientExt) {
    ig.request.end$.subscribe(() => saveState(ig));
    await ig.account.login(process.env.IG_USERNAME, process.env.IG_PASSWORD);
}

(async () => {
    const ig = withRealtime(new IgApiClient());
    ig.state.generateDevice(process.env.IG_USERNAME);
    ig.state.proxyUrl = process.env.IG_PROXY;

    // this will set the auth and the cookies for instagram
    await readState(ig);

    // this logs the client in
    // await ig.simulate.preLoginFlow();
    await loginToInstagram(ig);

    await ig.realtime.connect({
        irisData: await ig.feed.directInbox().request(),
        socksOptions: {
            type: 5,
            port: 6153,
            host: '127.0.0.1'
        }
    });

    function logEvent(name: string) {
        return (data: any) => console.log(name, data);
    }

    ig.realtime.on('message', (message) => console.log(message));
    ig.realtime.on('receive', (topic, messages) => {
        console.log('receive', topic, messages);
    });
    ig.realtime.on('error', console.error);
    ig.realtime.on('direct', logEvent('direct'));
})();
utopyin commented 3 years ago

same error for me

0x0arash commented 3 years ago

I'm getting the same error... a fix or some kind of direction would be appreciated very much.

theshubhampanchal commented 3 years ago

I am also getting same error.

I have tried lots of things but not solved.

  1. Change another account
  2. Change Node Versions
  3. Tried on both Linux & Windows

But getting same error.

theshubhampanchal commented 3 years ago

Your second problem has a small fix.

Overwrite this code :

import 'dotenv/config';
import { IgApiClient } from 'instagram-private-api';
import { IgApiClientExt, withRealtime } from 'instagram_mqtt';
import { GraphQLSubscriptions } from 'instagram_mqtt';
import { SkywalkerSubscriptions } from 'instagram_mqtt';
import { promisify } from 'util';
import { writeFile, readFile, exists } from 'fs';

const writeFileAsync = promisify(writeFile);
const readFileAsync = promisify(readFile);
const existsAsync = promisify(exists);

async function saveState(ig: IgApiClientExt,username: string) {
    var dir = 'sessions/';
    var path = dir+username+'_state.json';
    if (!fs.existsSync(dir)){ fs.mkdirSync(dir); }
    return writeFileAsync(path,await ig.exportState(), {encoding: 'utf8'});
}

async function readState(ig: IgApiClientExt,username: string) {
    var dir = 'sessions/';
    var path = dir+username+'_state.json';
    if (!fs.existsSync(dir)){ fs.mkdirSync(dir); }
    if (!await existsAsync(path))
        return false;
    await ig.importState(await readFileAsync(path, {encoding: 'utf8'}));
    return true;
}

async function loginToInstagram(ig: IgApiClientExt,username: string,password: string) {
        ig.state.generateDevice(username);
        ig.request.end$.subscribe(() => saveState(ig,username));
        var response;
        if(!await readState(ig,username)) {
            await ig.simulate.preLoginFlow();
            response = await ig.account.login(username,password);
        } else {
            try {
                response = await ig.account.currentUser();
            } catch (e) {
                response = await ig.account.login(username,password);
            }
        }
        return response;
}

(async () => {
    const ig = withRealtime(new IgApiClient());
    ig.state.proxyUrl = process.env.IG_PROXY;

    var userData = await loginToInstagram(ig,process.env.IG_USERNAME, process.env.IG_PASSWORD);  

    await ig.realtime.connect({
        irisData: await ig.feed.directInbox().request(),
        socksOptions: {
            type: 5,
            port: 6153,
            host: '127.0.0.1'
        }
    });

    function logEvent(name: string) {
        return (data: any) => console.log(name, data);
    }

    ig.realtime.on('message', (message) => console.log(message));
    ig.realtime.on('receive', (topic, messages) => {
        console.log('receive', topic, messages);
    });
    ig.realtime.on('error', console.error);
    ig.realtime.on('direct', logEvent('direct'));
})();
covein commented 3 years ago

@theshubhampanchal thanks!

theshubhampanchal commented 3 years ago

@covein please check updated code. I have changed it. Now it will not login everytime you run your code. It will restore session details from state file and if details are not valid then it will try to login again with credentials and save new session to state file.

covein commented 3 years ago

@theshubhampanchal very thank u. Still waiting for @Nerixyz to have a look at the problem, I think the code is fine, but I still can not get the notification.

Nerixyz commented 3 years ago

There were some updates to mqtts, not sure if that broke something.

covein commented 3 years ago

There were some updates to mqtts, not sure if that broke something.

I want to use this to a small project of mine recently. It will be nice if you can update the code. Thanks in advance.