-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPhysicalcu.cpp
More file actions
139 lines (129 loc) · 4.96 KB
/
Physicalcu.cpp
File metadata and controls
139 lines (129 loc) · 4.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/*
* _____ _ ____ ____
* / ___/_____(_) __ \/ __ )
* \__ \/ ___/ / / / / __ |
* ___/ / /__/ / /_/ / /_/ /
* /____/\___/_/_____/_____/
*
*
* BEGIN_COPYRIGHT
*
* This file is part of SciDB.
* Copyright (C) 2008-2014 SciDB, Inc.
*
* SciDB is free software: you can redistribute it and/or modify
* it under the terms of the AFFERO GNU General Public License as published by
* the Free Software Foundation.
*
* SciDB is distributed "AS-IS" AND WITHOUT ANY WARRANTY OF ANY KIND,
* INCLUDING ANY IMPLIED WARRANTY OF MERCHANTABILITY,
* NON-INFRINGEMENT, OR FITNESS FOR A PARTICULAR PURPOSE. See
* the AFFERO GNU General Public License for the complete license terms.
*
* You should have received a copy of the AFFERO GNU General Public License
* along with SciDB. If not, see <http://www.gnu.org/licenses/agpl-3.0.html>
*
* END_COPYRIGHT
*/
#include "query/Operator.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static int
cmpstringp (const void *p1, const void *p2)
{
/* The actual arguments to this function are "pointers to pointers to char",
* * but strcmp(3) arguments are "pointers to char", hence the following cast
* * plus dereference */
return strcmp (*(char *const *) p1, *(char *const *) p2);
}
namespace scidb
{
class Physicalcu : public PhysicalOperator
{
public:
Physicalcu(string const& logicalName,
string const& physicalName,
Parameters const& parameters,
ArrayDesc const& schema):
PhysicalOperator(logicalName, physicalName, parameters, schema)
{}
virtual ArrayDistribution getOutputDistribution(vector<ArrayDistribution> const& inputDistributions,
vector<ArrayDesc> const& inputSchemas) const
{
return inputDistributions[0];
}
/**
* [Optimizer API] Determine if operator changes result chunk distribution.
* @param sourceSchemas shapes of all arrays that will given as inputs.
* @return true if will changes output chunk distribution, false if otherwise
*/
virtual bool changesDistribution(std::vector<ArrayDesc> const& sourceSchemas) const
{
return false;
}
/* The instance-parallel 'main' routine of this operator. This runs on
* each instance in the SciDB cluster and returns a shared pointer to an
* Array.
*/
shared_ptr< Array> execute(vector< shared_ptr< Array> >& inputArrays, shared_ptr<Query> query)
{
shared_ptr<Array> inputArray = inputArrays[0];
shared_ptr<ConstArrayIterator> arrayIter(inputArray->getConstIterator(0));
shared_ptr<ConstChunkIterator> chunkIter;
shared_ptr<Array> output(new MemArray(inputArray->getArrayDesc(), query));
shared_ptr<ArrayIterator> outputArrayIterator(output->getIterator(0));
// Iterate over each chunk owned by this instance
while (!arrayIter->end())
{
chunkIter = arrayIter->getChunk().getConstIterator(ChunkIterator::IGNORE_EMPTY_CELLS);
if(chunkIter->end())
{
++(*arrayIter);
} else
{
Coordinates start = chunkIter->getPosition();
vector<string> chunkdata;
for(;;)
{
Value const& val = chunkIter->getItem();
// XXX this is a dumb way to munge these data, improve
chunkdata.push_back(string(val.getString()));
++(*chunkIter);
if(chunkIter->end()) break;
}
const char **a = (const char **)malloc(chunkdata.size() * sizeof(char *));
if(!a) throw PLUGIN_USER_EXCEPTION("chunk unique malloc error", SCIDB_SE_UDO, SCIDB_USER_ERROR_CODE_START);
for(unsigned int j=0;j<chunkdata.size(); ++j) a[j] = chunkdata[j].c_str();
qsort (a, chunkdata.size(), sizeof (char *), cmpstringp);
// write the output (has same schema as input)
shared_ptr<ChunkIterator> outputChunkIter = outputArrayIterator->newChunk(start).getIterator(query, ChunkIterator::SEQUENTIAL_WRITE);
size_t j = 0;
Value val;
const char *ref = a[0];
for(;;)
{
if(outputChunkIter->end() || j>=chunkdata.size()) break;
if(a[j] && ((strcmp(ref, a[j])!=0) || j==0))
{
val.setData(a[j],strlen(a[j])+1);
ref = a[j];
outputChunkIter->writeItem(val);
}
++j;
++(*outputChunkIter);
}
if(a) free(a);
outputChunkIter->flush();
if(outputChunkIter) outputChunkIter->reset();
// Advance the array iterators in lock step
++(*arrayIter);
++(*outputArrayIterator);
}
}
outputArrayIterator->reset();
return output;
}
};
REGISTER_PHYSICAL_OPERATOR_FACTORY(Physicalcu, "cu", "Physicalcu");
} //namespace scidb