-
Notifications
You must be signed in to change notification settings - Fork 35
Pass file/offset information into mapper #71
Description
I found myself needing to perform data dedeplication for the update hotfix data. (Yes, I know incremental upload works better with map reduce.)
My ideal deduplication mechanism is:
- Collect the upload date of all records with the same ID
- Throw away all but the most recent record
i.e. last write wins
It is difficult to do this in telemetry-server today if N>1 mappers are involved because the mapper receives no metadata about upload time. Call this a deficiency in how the hotfix data stream is constructed if you want. You can do this with N=1 mappers, but it is absurdly slow (I have multiple cores and I want to use them, dammit). You can write out the original records in map() and have the reducer produce essentially new streams from the filtered output. But that involves tons of extra CPU and I/O. Why should I rewrite a multi-gigabyte data set where the duplication rate is low?
I propose adding the source file/offset information into map(). In my dedupe job, I can write the record ID and file/offset information. A reducer can then find IDs with multiple records and produce an output listing either the "good" or "bad" sets of file/offsets. I can then load this file into another MR job and filter incoming records against it. You pay a penalty for hash lookup on each record, but that should be fast if set is used.
I've already hacked up telemetry-server to pass this extra information to map(). But it breaks the API of map(). Next step is likely to hook up inspect.getargspec() to see if the callee supports the new arguments and pass them if it does. But I wanted to get feedback from people before I fully implement this, as the changes are probably a bit controversial.