Skip to content

Commit

Permalink
fix(MergeLineContext): Partially remove unused service ids
Browse files Browse the repository at this point in the history
  • Loading branch information
binh-dam-ibigroup committed Nov 24, 2021
1 parent 6ea5968 commit 942e3e6
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.bson.codecs.pojo.annotations.BsonIgnore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -87,6 +88,10 @@ public class MergeFeedsJob extends FeedSourceJob {
public Set<String> serviceIdsToCloneRenameAndExtend = new HashSet<>();
@JsonIgnore @BsonIgnore
public Set<String> serviceIdsFromActiveFeedToTerminateEarly = new HashSet<>();
@JsonIgnore @BsonIgnore
public Set<String> serviceIdsFromActiveFeedToRemove = new HashSet<>();
@JsonIgnore @BsonIgnore
public Set<String> serviceIdsFromFutureFeedToRemove = new HashSet<>();

private List<TripAndCalendars> sharedConsistentTripAndCalendarIds = new ArrayList<>();

Expand Down Expand Up @@ -441,17 +446,40 @@ private void determineMergeStrategy() {
getActiveServiceIds(feedMergeContext.getActiveTripIdsNotInFutureFeed())
);

// Build the set of calendars ids from the future feed to be removed
// because they become no longer used after shared trips are remapped to another service id.
serviceIdsFromFutureFeedToRemove = Sets.difference(
feedMergeContext.future.feedToMerge.serviceIds,
getFutureServiceIds(feedMergeContext.getFutureTripIdsNotInActiveFeed())
);

// Build the set of calendars ids from the active feed to be removed
// because they become no longer used after shared trips are remapped to another service id.
serviceIdsFromActiveFeedToRemove = Sets.difference(
feedMergeContext.active.feedToMerge.serviceIds,
getActiveServiceIds(feedMergeContext.getActiveTripIdsNotInFutureFeed())
);

mergeFeedsResult.mergeStrategy = CHECK_STOP_TIMES;
}
}

/**
* Obtains the service ids corresponding to the provided trip ids.
*/
private List<String> getActiveServiceIds(Set<String> tripIds) {
private Set<String> getActiveServiceIds(Set<String> tripIds) {
return tripIds.stream()
.map(tripId -> feedMergeContext.active.feed.trips.get(tripId).service_id)
.collect(Collectors.toList());
.collect(Collectors.toSet());
}

/**
* Obtains the service ids corresponding to the provided trip ids.
*/
private Set<String> getFutureServiceIds(Set<String> tripIds) {
return tripIds.stream()
.map(tripId -> feedMergeContext.future.feed.trips.get(tripId).service_id)
.collect(Collectors.toSet());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,27 @@ private boolean checkCalendarIds(Set<NewGTFSError> idErrors, FieldContext fieldC
.format(GTFS_DATE_FORMATTER));
}
}
/* } else {

// Remove calendar entries that are no longer used.
if (job.serviceIdsFromActiveFeedToRemove.contains(keyValue)) {
LOG.warn(
"Skipping active calendar entry {} because it will become unused in the merged feed.",
keyValue);
mergeFeedsResult.skippedIds.add(key);
shouldSkipRecord = true;
}
} else {
// If handling the future feed, the MTC revised feed merge logic is as follows:
// - Calendar entries from the future feed will be inserted as is in the merged feed.
// so no additional processing needed here, unless the calendar entry is no longer used.
// so no additional processing needed here, unless the calendar entry is no longer used,
// in that case we drop the calendar entry.
if (job.serviceIdsFromFutureFeedToRemove.contains(keyValue)) {
LOG.warn(
"Skipping future calendar entry {} because it will become unused in the merged feed.",
keyValue);
mergeFeedsResult.skippedIds.add(key);
shouldSkipRecord = true;
}
*/
}


Expand Down Expand Up @@ -137,7 +145,7 @@ public void addClonedServiceId() throws IOException {
String originalServiceId = keyValue;
if (job.serviceIdsToCloneRenameAndExtend.contains(originalServiceId)) {
// FIXME: Do we need to worry about calendar_dates?
String[] clonedValues = getRowValues().clone();
String[] clonedValues = getOriginalRowValues().clone();
String newServiceId = clonedValues[keyFieldIndex] = String.join(":", getIdScope(), originalServiceId);
// Modify start date only (preserve the end date from the future calendar entry).
int startDateIndex = Table.CALENDAR.getFieldIndex("start_date");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class MergeLineContext {
private CsvReader csvReader;
private boolean skipRecord;
protected boolean keyFieldMissing;
private String[] originalRowValues;
private String[] rowValues;
private int lineNumber = 0;
protected final Table table;
Expand Down Expand Up @@ -198,6 +199,7 @@ public boolean iterateOverRows() throws IOException {
if (!constructRowValues()) {
return false;
}

finishRowAndWriteToZip();
}
return true;
Expand Down Expand Up @@ -491,6 +493,7 @@ public void initializeRowValues() {
skipRecord = false;
// Reset the row values (this must happen after the first line is checked).
rowValues = new String[sharedSpecFields.size()];
originalRowValues = new String[sharedSpecFields.size()];
}

public void writeValuesToTable(String[] values, boolean incrementLineNumbers) throws IOException {
Expand Down Expand Up @@ -522,6 +525,7 @@ public void writeHeaders() throws IOException {
* @return false, if a failing condition was encountered. true, if everything was ok.
*/
public boolean constructRowValues() throws IOException {
boolean result = true;
// Piece together the row to write, which should look practically identical to the original
// row except for the identifiers receiving a prefix to avoid ID conflicts.
for (int specFieldIndex = 0; specFieldIndex < sharedSpecFields.size(); specFieldIndex++) {
Expand All @@ -533,65 +537,73 @@ public boolean constructRowValues() throws IOException {
field,
csvReader.get(fieldsFoundList.indexOf(field))
);
// Handle filling in agency_id if missing when merging regional feeds. If false is returned,
// the job has encountered a failing condition (the method handles failing the job itself).
if (!updateAgencyIdIfNeeded(fieldContext)) {
return false;
}
// Determine if field is a GTFS identifier (and scope if needed).
scopeValueIfNeeded(fieldContext);
// Only need to check for merge conflicts if using MTC merge type because
// the regional merge type scopes all identifiers by default. Also, the
// reference tracker will get far too large if we attempt to use it to
// track references for a large number of feeds (e.g., every feed in New
// York State).
if (job.mergeType.equals(SERVICE_PERIOD)) {
// Remap service id from active feed to distinguish them
// from entries with the same id in the future feed.
// See https://github.com/ibi-group/datatools-server/issues/244
if (handlingActiveFeed && fieldContext.nameEquals(SERVICE_ID)) {
updateAndRemapOutput(fieldContext);
originalRowValues[specFieldIndex] = fieldContext.getValueToWrite();
if (!skipRecord) {
// Handle filling in agency_id if missing when merging regional feeds. If false is returned,
// the job has encountered a failing condition (the method handles failing the job itself).
if (!updateAgencyIdIfNeeded(fieldContext)) {
result = false;
}
// Determine if field is a GTFS identifier (and scope if needed).
scopeValueIfNeeded(fieldContext);
// Only need to check for merge conflicts if using MTC merge type because
// the regional merge type scopes all identifiers by default. Also, the
// reference tracker will get far too large if we attempt to use it to
// track references for a large number of feeds (e.g., every feed in New
// York State).
if (job.mergeType.equals(SERVICE_PERIOD)) {
// Remap service id from active feed to distinguish them
// from entries with the same id in the future feed.
// See https://github.com/ibi-group/datatools-server/issues/244
if (handlingActiveFeed && fieldContext.nameEquals(SERVICE_ID)) {
updateAndRemapOutput(fieldContext);
}

updateServiceIdsIfNeeded(fieldContext);
updateServiceIdsIfNeeded(fieldContext);

// Store values for key fields that have been encountered and update any key values that need modification due
// to conflicts.
if (!checkFieldsForMergeConflicts(getIdErrors(fieldContext), fieldContext)) {
// Store values for key fields that have been encountered and update any key values that need modification due
// to conflicts.
if (!checkFieldsForMergeConflicts(getIdErrors(fieldContext), fieldContext)) {
skipRecord = true;
continue;
}
}
// If the current field is a foreign reference, check if the reference has been removed in the
// merged result. If this is the case (or other conditions are met), we will need to skip this
// record. Likewise, if the reference has been modified, ensure that the value written to the
// merged result is correctly updated.
if (!checkForeignReferences(fieldContext)) {
skipRecord = true;
break;
continue;
}
rowValues[specFieldIndex] = fieldContext.getValueToWrite();
}
// If the current field is a foreign reference, check if the reference has been removed in the
// merged result. If this is the case (or other conditions are met), we will need to skip this
// record. Likewise, if the reference has been modified, ensure that the value written to the
// merged result is correctly updated.
if (!checkForeignReferences(fieldContext)) {
skipRecord = true;
break;
}
rowValues[specFieldIndex] = fieldContext.getValueToWrite();
}
return true;
return result;
}

public void finishRowAndWriteToZip() throws IOException {
private void finishRowAndWriteToZip() throws IOException {
boolean shouldWriteCurrentRow = true;
// Do not write rows that are designated to be skipped.
if (skipRecord && job.mergeType.equals(SERVICE_PERIOD)) {
mergeFeedsResult.recordsSkipCount++;
return;
shouldWriteCurrentRow = false;
}
// Store row and stop values. If the return value is true, the record has been skipped and we
// should skip writing the row to the merged table.
if (storeRowAndStopValues()) {
return;
shouldWriteCurrentRow = false;
}

// Finally, handle writing lines to zip entry.
if (mergedLineNumber == 0) {
writeHeaders();
}
// Write line to table.
writeValuesToTable(rowValues, true);

if (shouldWriteCurrentRow) {
// Write line to table.
writeValuesToTable(rowValues, true);
}

// Optional table-specific additional processing.
afterRowWrite();
Expand Down Expand Up @@ -631,7 +643,7 @@ protected int getLineNumber() {
return lineNumber;
}

protected String[] getRowValues() { return rowValues; }
protected String[] getOriginalRowValues() { return originalRowValues; }

/**
* Retrieves the value for the specified CSV field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public class MergeFeedsJobTest extends UnitTest {
* and some added trips, and a trip from the base feed removed.
*/
private static FeedVersion fakeTransitSameSignatureTrips;
/**
* The base feed (transposed to the future dates), with some trip_ids from the base feed with the same signature,
* and a trip from the base feed removed.
*/
private static FeedVersion fakeTransitSameSignatureTrips2;
private static FeedSource bart;
private static FeedVersion noAgencyVersion1;
private static FeedVersion noAgencyVersion2;
Expand Down Expand Up @@ -137,6 +142,7 @@ public static void setUp() throws IOException {
fakeTransitModService = createFeedVersion(fakeTransit, zipFolderFiles("merge-data-mod-services"));
fakeTransitNewSignatureTrips = createFeedVersion(fakeTransit, zipFolderFiles("merge-data-mod-trips"));
fakeTransitSameSignatureTrips = createFeedVersion(fakeTransit, zipFolderFiles("merge-data-added-trips"));
fakeTransitSameSignatureTrips2 = createFeedVersion(fakeTransit, zipFolderFiles("merge-data-added-trips-2"));

// Feeds with no agency id
FeedSource noAgencyIds = new FeedSource("no-agency-ids", project.id, MANUALLY_UPLOADED);
Expand Down Expand Up @@ -345,17 +351,24 @@ void mergeMTCShouldHandleExtendFutureStrategy() throws SQLException {
// assert service_ids start_dates have been extended to the start_date of the base feed.
String mergedNamespace = mergeFeedsJob.mergedVersion.namespace;

// There should be no unused service ids.
assertThatSqlCountQueryYieldsExpectedCount(
String.format("SELECT count(*) FROM %s.errors where error_type = 'SERVICE_UNUSED'", mergedNamespace),
0
);

// - calendar table
// expect a total of 5 records in calendar table
// expect a total of 1 record in calendar table that
// corresponds to the trip ids present in both active and future feed.
assertThatSqlCountQueryYieldsExpectedCount(
String.format("SELECT count(*) FROM %s.calendar", mergedNamespace),
5
1
);
// expect that two records in calendar table have the correct start_date
// (one for the original calendar entry, one for the extended service id for trips with same signature)
assertThatSqlCountQueryYieldsExpectedCount(
String.format("SELECT count(*) FROM %s.calendar where start_date = '20170918' and monday = 1", mergedNamespace),
2
1
);
}

Expand Down Expand Up @@ -432,6 +445,60 @@ void mergeMTCShouldHandleMatchingTripIdsWithSameSignature() throws SQLException
);
}

/**
* Ensures that an MTC merge of feeds with exact matches of service_ids and trip_ids,
* trip ids having the same signature (same stop times) will utilize the
* {@link MergeStrategy#CHECK_STOP_TIMES} strategy correctly and drop unused future service ids.
*/
@Test
void mergeMTCShouldHandleMatchingTripIdsAndDropUnusedFutureCalendar() throws SQLException {
Set<FeedVersion> versions = new HashSet<>();
versions.add(fakeTransitBase);
versions.add(fakeTransitSameSignatureTrips2);
MergeFeedsJob mergeFeedsJob = new MergeFeedsJob(user, versions, "merged_output", MergeFeedsType.SERVICE_PERIOD);
// Run the job in this thread (we're not concerned about concurrency here).
mergeFeedsJob.run();
// Check that correct strategy was used.
assertEquals(
MergeStrategy.CHECK_STOP_TIMES,
mergeFeedsJob.mergeFeedsResult.mergeStrategy
);
// Result should succeed.
assertFalse(
mergeFeedsJob.mergeFeedsResult.failed,
"Merge feeds job should succeed with CHECK_STOP_TIMES strategy."
);

String mergedNamespace = mergeFeedsJob.mergedVersion.namespace;

// - calendar table
// expect a total of 4 records in calendar table:
// - common_id from the active feed (but start date is changed to one day before first start_date in future feed),
// - common_id from the future feed (because of one future trip not in the active feed),
// - common_id cloned and extended for the matching trip id present in both active and future feeds
// (from MergeFeedsJob#serviceIdsToCloneAndRename),
// - only_calendar_id used in the future feed.
assertThatSqlCountQueryYieldsExpectedCount(
String.format("SELECT count(*) FROM %s.calendar", mergedNamespace),
3
);

// Out of all trips from the input datasets, expect 4 trips in merged output.
// 1 trip from active feed that is not in the future feed,
// 1 trip in both the active and future feeds, with the same signature (same stop times),
// 2 trips from the future feed not in the active feed.
assertThatSqlCountQueryYieldsExpectedCount(
String.format("SELECT count(*) FROM %s.trips", mergedNamespace),
3
);

// There should be no unused service ids.
assertThatSqlCountQueryYieldsExpectedCount(
String.format("SELECT count(*) FROM %s.errors where error_type = 'SERVICE_UNUSED'", mergedNamespace),
0
);
}

/**
* Ensures that an MTC merge of feeds with trip_ids matching in the active and future feed,
* but with different signatures (e.g. different stop times) fails.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
agency_id,agency_name,agency_url,agency_lang,agency_phone,agency_email,agency_timezone,agency_fare_url,agency_branding_url
1,Fake Transit,,,,,America/Los_Angeles,,
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
service_id,monday,tuesday,wednesday,thursday,friday,saturday,sunday,start_date,end_date
common_id,1,1,1,1,1,1,1,20170923,20170925
only_calendar_id,1,1,1,1,1,1,1,20170920,20170927
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
feed_id,feed_publisher_name,feed_publisher_url,feed_lang,feed_version
fake_transit,Conveyal,http://www.conveyal.com,en,1.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
agency_id,route_id,route_short_name,route_long_name,route_desc,route_type,route_url,route_color,route_text_color,route_branding_url
1,1,1,Route 1,,3,,7CE6E7,FFFFFF,
1,2,2,Route 2,,3,,7CE6E7,FFFFFF,
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
stop_id,accessibility_id,cardinal_direction,relative_position,stop_city
4u6g,0,SE,FS,Scotts Valley
johv,0,SE,FS,Scotts Valley
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
trip_id,arrival_time,departure_time,stop_id,stop_sequence,stop_headsign,pickup_type,drop_off_type,shape_dist_traveled,timepoint
trip3,07:00:00,07:00:00,4u6g,1,,0,0,0.0000000,
trip3,07:01:00,07:01:00,johv,2,,0,0,341.4491961,
only-calendar-trip1,07:00:00,07:00:00,4u6g,1,,0,0,0.0000000,
only-calendar-trip1,07:01:00,07:01:00,johv,2,,0,0,341.4491961,
only-calendar-trip2,07:00:00,07:00:00,johv,1,,0,0,0.0000000,
only-calendar-trip2,07:01:00,07:01:00,4u6g,2,,0,0,341.4491961,
only-calendar-trip999,07:00:00,07:00:00,johv,1,,0,0,0.0000000,
only-calendar-trip999,07:01:00,07:01:00,4u6g,2,,0,0,341.4491961,
Loading

0 comments on commit 942e3e6

Please sign in to comment.