Closed Klapeyron closed 3 weeks ago
The task involves updating the indexer to handle new types of events or additional data fields as specified in the pull request. This requires modifications to several files to ensure that the new events are correctly processed and stored.
src/processor.ts
EventProcessorProvider
to handle new event types.SubstrateBatchProcessor
configuration to include the new event types.import { SubstrateBatchProcessor } from "@subsquid/substrate-processor";
import { EventProcessorProvider } from "./eventprocessor/eventProcessorProvider";
const provider = new EventProcessorProvider();
export const processor = new SubstrateBatchProcessor()
.setDataSource({
chain: process.env.DATA_SOURCE_CHAIN as string,
})
.addEvent({
name: provider.getAllProcessesNames(),
call: true,
})
.setFields({
event: {},
block: { timestamp: true },
});
// Add specific new events if needed
processor.addEvent({
name: 'NewEventType',
call: true,
});
src/eventprocessor/market/marketClosedEventProcessor.ts
process
method to handle new fields or changes in the marketClosed
event structure.import { EventProcessor } from "../eventProcessor";
import { Store } from "@subsquid/typeorm-store";
import { market } from "../../types/events";
import { EntityManager } from "typeorm";
import {
DataHandlerContext,
Block,
Event,
} from "@subsquid/substrate-processor";
import { Market, MarketStatus } from "../../model";
export class MarketClosedEventProcessor implements EventProcessor {
getHandledEventName(): string {
return "Market.MarketClosed";
}
async process(
ctx: DataHandlerContext<Store, any>,
block: Block<any>,
event: Event,
) {
const marketClosedEvent = market.marketClosed.v7;
if (marketClosedEvent.is(event)) {
let parsedEvent = marketClosedEvent.decode(event);
let market = await ctx.store.findOne(Market, {
where: { id: parsedEvent.marketId.toString() },
});
if (market) {
market.status = MarketStatus.CLOSE;
market.lifetime = BigInt(block.header.height);
// Handle additional fields if any
if (parsedEvent.additionalField) {
market.additionalField = parsedEvent.additionalField;
}
await ctx.store.save(market);
}
} else {
console.error("Unsupported spec");
}
}
}
src/eventprocessor/market/positionCreatedEventProcessor.ts
import { EventProcessor } from '../eventProcessor';
import { PositionCreatedEvent } from '../../types/market/events';
class PositionCreatedEventProcessor implements EventProcessor {
async process(event: PositionCreatedEvent): Promise<void> {
try {
const { positionId, marketId, userId, amount, timestamp } = event;
console.log(`Processing position created event:
Position ID: ${positionId},
Market ID: ${marketId},
User ID: ${userId},
Amount: ${amount},
Timestamp: ${timestamp}`);
} catch (error) {
console.error('Error processing position created event:', error);
}
}
}
export default PositionCreatedEventProcessor;
src/types/market/events.ts
import { sts, Block, Bytes, Option, Result, EventType, RuntimeCtx } from '../support';
import * as v2 from '../v2';
export const marketCreated = {
name: 'Market.MarketCreated',
v2: new EventType(
'Market.MarketCreated',
sts.struct({
marketId: sts.bigint(),
ticker: sts.bytes(),
tickSize: sts.bigint(),
lifetime: sts.number(),
initialMargin: v2.Percent,
maintenanceMargin: v2.Percent,
contractUnit: v2.ContractUnit,
creator: v2.AccountId32,
creationTimestamp: sts.number(),
})
),
};
export const marketUpdated = {
name: 'Market.MarketUpdated',
v2: new EventType(
'Market.MarketUpdated',
sts.struct({
marketId: sts.bigint(),
newTicker: sts.bytes(),
newTickSize: sts.bigint(),
newLifetime: sts.number(),
newInitialMargin: v2.Percent,
newMaintenanceMargin: v2.Percent,
newContractUnit: v2.ContractUnit,
})
),
};
export const marketRemoved = {
name: 'Market.MarketRemoved',
v2: new EventType(
'Market.MarketRemoved',
sts.struct({
marketId: sts.bigint(),
})
),
};
export const marketClosed = {
name: 'Market.MarketClosed',
v7: new EventType(
'Market.MarketClosed',
sts.struct({
marketId: sts.bigint(),
})
),
};
src/eventprocessor/market/marketCreatedEventProcessor.ts
MarketCreatedEventProcessor
to handle new fields in the marketCreated
event.import { EventProcessor } from "../eventProcessor";
import { Store } from "@subsquid/typeorm-store";
import { market } from "../../types/events";
import { Market, MarketStatus } from "../../model";
import {
DataHandlerContext,
Block,
Event,
} from "@subsquid/substrate-processor";
import { encodeMarketTicker } from "../../utils/encodersUtils";
import { BigDecimal } from "@subsquid/big-decimal";
import { USDC_DECIMALS } from "../../utils";
export class MarketCreatedEventProcessor implements EventProcessor {
getHandledEventName(): string {
return "Market.MarketCreated";
}
async process(
ctx: DataHandlerContext<Store, any>,
block: Block<any>,
event: Event,
) {
let receivedEvent = market.marketCreated.v2;
if (receivedEvent.is(event)) {
const decodedEvent = receivedEvent.decode(event);
const createdMarket = new Market({
id: decodedEvent.marketId.toString(),
ticker: encodeMarketTicker(decodedEvent.ticker),
tickSize: BigDecimal(decodedEvent.tickSize, USDC_DECIMALS),
lifetime: BigInt(decodedEvent.lifetime),
initialMargin: decodedEvent.initialMargin,
maintenanceMargin: decodedEvent.maintenanceMargin,
contractUnit: BigDecimal(
decodedEvent.contractUnit.contractUnit,
decodedEvent.contractUnit.decimals,
),
blockHeight: BigInt(block.header.height),
timestamp: new Date(block.header.timestamp),
dailyVolume: BigInt(0),
status: MarketStatus.OPEN,
creator: decodedEvent.creator,
marketType: decodedEvent.marketType,
});
await ctx.store.save(createdMarket);
} else {
console.error("Unsupported spec");
}
}
}
src/eventprocessor/eventProcessorProvider.ts
import { EventProcessor } from "./eventProcessor";
import { MarketCreatedEventProcessor } from "./market/marketCreatedEventProcessor";
import { OrderCreatedEventProcessor } from "./market/orderCreatedEventProcessor";
import { PositionCreatedEventProcessor } from "./market/positionCreatedEventProcessor";
import { OrderCanceledEventProcessor } from "./market/orderCanceledEventProcessor";
import { OrderFilledEventProcessor } from "./market/orderFilledEventProcessor";
import { OrderReducedEventProcessor } from "./market/orderReducedEventProcessor";
import { MarketClosedEventProcessor } from "./market/marketClosedEventProcessor";
import { MarketRemovedEventProcessor } from "./market/marketRemovedEventProcessor";
import { PositionClosedEventProcessor } from "./market/positionClosedEventProcessor";
import { PositionReducedEventProcessor } from "./market/positionReducedEventProcessor";
import { PositionMarkedToMarketEventProcessor } from "./market/positionMarkedToMarketEventProcessor";
import { LatestOraclePriceProcessor } from "./market/latestOraclePriceProcessor";
import { OrderExtendedEventProcessor } from "./market/orderExtendedEventProcessor";
import { WithdrawRequestedEventProcessor } from "./bridge/withdrawRequestedEventProcessor";
import { WithdrawCanceledEventProcessor } from "./bridge/withdrawCanceledEventProcess";
import { WithdrawApprovedEventProcessor } from "./bridge/withdrawApprovedEventProcessor";
import { ReserveRepatriatedEventProcessor } from "./balances/reserveRepatriatedEventProcessor";
import { MarketUpdatedEventProcessor } from "./market/marketUpdatedEventProcessor";
const processors: EventProcessor[] = [
new MarketCreatedEventProcessor(),
new OrderCreatedEventProcessor(),
new PositionCreatedEventProcessor(),
new OrderCanceledEventProcessor(),
new OrderFilledEventProcessor(),
new OrderReducedEventProcessor(),
new MarketClosedEventProcessor(),
new MarketRemovedEventProcessor(),
new PositionClosedEventProcessor(),
new PositionReducedEventProcessor(),
new PositionMarkedToMarketEventProcessor(),
new LatestOraclePriceProcessor(),
new OrderExtendedEventProcessor(),
new WithdrawCanceledEventProcessor(),
new WithdrawRequestedEventProcessor(),
new WithdrawApprovedEventProcessor(),
new ReserveRepatriatedEventProcessor(),
new MarketUpdatedEventProcessor(),
];
export class EventProcessorProvider {
processorMap = new Map<String, EventProcessor>();
constructor() {
processors.map((processor) => {
this.processorMap.set(processor.getHandledEventName(), processor);
});
}
getProcessorByName(name: String): EventProcessor | undefined {
return this.processorMap.get(name);
}
getEventProcessors(): EventProcessor[] {
return processors;
}
getAllProcessesNames(): string[] {
return processors.map((p) => p.getHandledEventName());
}
}
src/eventprocessor/market/orderCreatedEventProcessor.ts
OrderCreatedEventProcessor
to handle new fields in the orderCreated
event.import { EventProcessor } from "../eventProcessor";
import { Store } from "@subsquid/typeorm-store";
import {
ClosingOrder,
Market,
OpeningOrder,
Order,
OrderSide,
OrderStatus,
OrderType,
} from "../../model";
import { AggregatedOrdersHandler } from "./aggregatedOrdersHandler";
import {
DataHandlerContext,
Block,
Event,
Call,
} from "@subsquid/substrate-processor";
import * as events from "../../types/events";
import { encodeUserValue } from "../../utils/encodersUtils";
import { BigDecimal } from "@subsquid/big-decimal";
import { USDC_DECIMALS } from "../../utils";
export class OrderCreatedEventProcessor implements EventProcessor {
getHandledEventName(): string {
return "Market.OrderCreated";
}
callName = "Market.create_order";
async process(
ctx: DataHandlerContext<Store, any>,
block: Block<any>,
event: Event,
call: Call,
) {
const orderCreatedEvent = events.market.orderCreated.v2;
if (orderCreatedEvent.is(event)) {
const parsedEvent = orderCreatedEvent.decode(event);
const market = await ctx.store.get(Market, parsedEvent.market.toString());
const quantity = parsedEvent.quantity;
let order: Order;
order = new Order({
id: parsedEvent.orderId.toString(),
market: market,
price: BigDecimal(parsedEvent.price, USDC_DECIMALS),
side:
parsedEvent.side.__kind === "Long" ? OrderSide.LONG : OrderSide.SHORT,
quantity: BigInt(quantity),
initialQuantity: BigInt(quantity),
who: encodeUserValue(parsedEvent.who),
blockHeight: BigInt(block.header.height),
timestamp: new Date(block.header.timestamp),
status: OrderStatus.ACTIVE,
type:
parsedEvent.orderType.__kind === "Opening"
? new OpeningOrder({ type: "OpeningOrder" })
: new ClosingOrder({
type: "ClosingOrder",
value: parsedEvent.orderType.value,
}),
newField1: parsedEvent.newField1,
newField2: parsedEvent.newField2,
});
await ctx.store.save(order);
await AggregatedOrdersHandler.addNewOrderToTheAggregatedOrders(
ctx.store,
order,
);
} else {
console.error("Unsupported spec");
}
}
}
src/eventprocessor/market/orderFilledEventProcessor.ts
OrderFilledEventProcessor
to handle new fields in the orderFilled
event.import { EventProcessor } from "../eventProcessor";
import { Store } from "@subsquid/typeorm-store";
import { Order, OrderStatus, Position, OrderSide } from "../../model";
import { AggregatedOrdersHandler } from "./aggregatedOrdersHandler";
import {
DataHandlerContext,
Event,
Block,
Call,
} from "@subsquid/substrate-processor";
import * as events from "../../types/events";
export class OrderFilledEventProcessor implements EventProcessor {
getHandledEventName(): string {
return "Market.OrderFilled";
}
async process(
ctx: DataHandlerContext<Store, any>,
block: Block<any>,
event: Event,
) {
const orderFilledEvent = events.market.orderFilled.v2;
if (orderFilledEvent.is(event)) {
let parsedEvent = orderFilledEvent.decode(event);
let order = await ctx.store.findOne(Order, {
where: { id: parsedEvent.orderId.toString() },
relations: { market: true },
});
if (order) {
await AggregatedOrdersHandler.removeOrderFromAggregatedOrders(
ctx.store,
order,
);
if (order.type.isTypeOf === "ClosingOrder") {
let offsetingPositionId = order.type.value;
let offsetingPosition = await ctx.store.findOne(Position, {
where: { id: offsetingPositionId.toString() },
});
if (offsetingPosition) {
const positionCreatedEvent = block.events.find(
(element) =>
element.index < event.index &&
event.index - element.index <= 4 &&
element.callAddress?.toString() ===
event.callAddress?.toString() &&
element.name === "Market.PositionCreated",
);
if (positionCreatedEvent) {
let newPosition = await ctx.store.findOne(Position, {
where: { id: positionCreatedEvent?.args.positionId.toString() },
});
if (newPosition) {
if (order.side === OrderSide.LONG) {
newPosition.createPriceLong =
offsetingPosition.createPriceLong;
} else {
newPosition.createPriceShort =
offsetingPosition.createPriceShort;
}
await ctx.store.save(newPosition);
}
}
} else {
console.warn("Position not found");
}
}
order.status = OrderStatus.COMPLETED;
order.quantity = BigInt(0);
order.filledPrice = parsedEvent.filledPrice;
await ctx.store.save(order);
}
} else {
console.error("Unsupported spec");
}
}
}
Click here to create a Pull Request with the proposed solution
Files used for this task: