Fix misnamed variable (seg_id -> identity) leftover from previous version when seg_id was the only identity info.
Remove unused import
In pipe 2.5, this code was run across sets of messages that all shared a single seg_id, however in pipe 3.0 there are multiple seg_ids in the message stream, so we have to update the seg_id for each message.
Also, fix a couple of minor issues I ran across:
In pipe 2.5, this code was run across sets of messages that all shared a single seg_id, however in pipe 3.0 there are multiple seg_ids in the message stream, so we have to update the seg_id for each message.