github / gh-ost

GitHub's Online Schema-migration Tool for MySQL
MIT License
12.31k stars 1.25k forks source link

add binlog compress supported #1411

Closed fulldb closed 5 months ago

fulldb commented 5 months ago

A Pull Request should be associated with an Issue.

We wish to have discussions in Issues. A single issue may be targeted by multiple PRs. If you're offering a new feature or fixing anything, we'd like to know beforehand in Issues, and potentially we'll be able to point development in a particular direction.

Related issue: https://github.com/github/gh-ost/issues/0123456789

Further notes in https://github.com/github/gh-ost/blob/master/.github/CONTRIBUTING.md Thank you! We are open to PRs, but please understand if for technical reasons we are unable to accept each and any PR

Description

This PR [briefly explain what it does]

In case this PR introduced Go code changes:

fulldb commented 5 months ago

This code has serious issues and should be rejected.

fulldb commented 5 months ago

https://github.com/fulldb/gh-ost/blob/binlog-compress/go/binlog/gomysql_reader.go#L180

// handleLoadQueryEvent
func (this *GoMySQLReader) handleTransactionPayloadEvent(rowsEvents *replication.TransactionPayloadEvent, entriesChannel chan<- *BinlogEntry) (err error) {
    if this.currentCoordinates.IsLogPosOverflowBeyond4Bytes(&this.LastAppliedRowsEventHint) {
        return fmt.Errorf("Unexpected rows event at %+v, the binlog end_log_pos is overflow 4 bytes", this.currentCoordinates)
    }
    if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
        this.migrationContext.Log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
        return nil
    }
    for _, ev := range rowsEvents.Events {
        switch rowsEvent := ev.Event.(type) {
        case *replication.RowsEvent:
            // if err := this.handleRowsEvent(ev, binlogEvent, entriesChannel); err != nil {
            //  return err
            // }
            dml := ToEventDML(ev.Header.EventType.String())
            if dml == NotDML {
                return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
            }
            for i, row := range rowsEvent.Rows {
                if dml == UpdateDML && i%2 == 1 {
                    // An update has two rows (WHERE+SET)
                    // We do both at the same time
                    continue
                }
                binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
                binlogEntry.DmlEvent = NewBinlogDMLEvent(
                    string(rowsEvent.Table.Schema),
                    string(rowsEvent.Table.Table),
                    dml,
                )
                switch dml {
                case InsertDML:
                    {
                        binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
                    }
                case UpdateDML:
                    {
                        binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
                        binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
                    }
                case DeleteDML:
                    {
                        binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
                    }
                }
                // The channel will do the throttling. Whoever is reading from the channel
                // decides whether action is taken synchronously (meaning we wait before
                // next iteration) or asynchronously (we keep pushing more events)
                // In reality, reads will be synchronous
                entriesChannel <- binlogEntry
            }

        }
    }
    this.LastAppliedRowsEventHint = this.currentCoordinates
    return
}
fulldb commented 5 months ago

https://github.com/fulldb/gh-ost/blob/binlog-compress/go/binlog/gomysql_reader.go#L163

        case *replication.TransactionPayloadEvent:
            if err := this.handleTransactionPayloadEvent(binlogEvent, entriesChannel); err != nil {
                return err
            }
        }